1#![deny(missing_docs)]
2#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Mutex;
17use std::{
18 collections::{HashMap, VecDeque},
19 path::{Path, PathBuf},
20 sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21 thread::{self, JoinHandle},
22 time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29const SQLITE_DEFAULT_MAX_VARIABLES: usize = 999;
30const METRIC_FIELDS_PER_ROW: usize = 3;
31const INSERT_BATCH_SIZE: usize = SQLITE_DEFAULT_MAX_VARIABLES / METRIC_FIELDS_PER_ROW;
32const QUEUE_HARD_LIMIT: usize = 100_000;
36const RECONNECT_AFTER_FAILURES: u64 = 3;
39const RECONNECT_BACKOFF: Duration = Duration::from_secs(30);
41const ERROR_LOG_INTERVAL: Duration = Duration::from_secs(60);
43
44struct LogThrottle {
47 interval: Duration,
48 last_logged: Option<Instant>,
49 suppressed: u64,
50}
51impl LogThrottle {
52 const fn new(interval: Duration) -> Self {
53 LogThrottle {
54 interval,
55 last_logged: None,
56 suppressed: 0,
57 }
58 }
59 fn allow(&mut self) -> Option<u64> {
62 let now = Instant::now();
63 let due = match self.last_logged {
64 Some(last) => now.duration_since(last) >= self.interval,
65 None => true,
66 };
67 if due {
68 self.last_logged = Some(now);
69 Some(std::mem::take(&mut self.suppressed))
70 } else {
71 self.suppressed += 1;
72 None
73 }
74 }
75
76 fn log_if_due(&mut self, emit: impl FnOnce(u64)) {
80 if let Some(suppressed) = self.allow() {
81 emit(suppressed);
82 }
83 }
84}
85
86#[derive(Debug, Error)]
88pub enum MetricsError {
89 #[error("Database error: {0}")]
91 DbConnectionError(#[from] ConnectionError),
92 #[error("Migration error: {0}")]
94 MigrationError(Box<dyn std::error::Error + Send + Sync>),
95 #[error("Error querying DB: {0}")]
97 QueryError(#[from] diesel::result::Error),
98 #[error("Invalid database path")]
100 InvalidDatabasePath,
101 #[cfg(feature = "csv")]
103 #[error("IO Error: {0}")]
104 IoError(#[from] std::io::Error),
105 #[cfg(feature = "csv")]
107 #[error("CSV Error: {0}")]
108 CsvError(#[from] csv::Error),
109 #[error("Database has no metrics stored in it")]
111 EmptyDatabase,
112 #[error("Metric key {0} not found in database")]
114 KeyNotFound(String),
115 #[error("Exporter task has been stopped or crashed")]
117 ExporterUnavailable,
118 #[error("Session for signpost `{0}` has zero duration")]
120 ZeroLengthSession(String),
121 #[error("No metrics recorded for `{0}` in requested session")]
123 NoMetricsForKey(String),
124}
125
126impl MetricsError {
127 fn is_malformed_db(&self) -> bool {
129 self.to_string().contains("malformed")
130 }
131}
132
133pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
135
136mod metrics_db;
137mod models;
138mod recorder;
139mod schema;
140
141use crate::metrics_db::query;
142pub use metrics_db::{MetricsDb, Session};
143pub use models::{Metric, MetricKey, NewMetric};
144
145pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
146
147#[derive(QueryableByName)]
148struct PragmaCheckResult {
149 #[diesel(sql_type = diesel::sql_types::Text)]
150 #[diesel(column_name = quick_check)]
151 result: String,
152}
153
154fn remove_db_files(path: &Path) {
156 let db_path = PathBuf::from(path);
157 for suffix in &["", "-wal", "-shm"] {
158 let mut file_path = db_path.clone().into_os_string();
159 file_path.push(suffix);
160 let file_path = PathBuf::from(file_path);
161 if file_path.exists() {
162 if let Err(e) = std::fs::remove_file(&file_path) {
163 error!("Failed to remove {}: {}", file_path.display(), e);
164 } else {
165 info!("Removed corrupt database file: {}", file_path.display());
166 }
167 }
168 }
169}
170
171fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
172 let url = path
173 .as_ref()
174 .to_str()
175 .ok_or(MetricsError::InvalidDatabasePath)?;
176 let mut db = SqliteConnection::establish(url)?;
177
178 sql_query("PRAGMA journal_mode=WAL;").execute(&mut db)?;
180
181 sql_query("PRAGMA busy_timeout = 5000;").execute(&mut db)?;
183
184 db.run_pending_migrations(MIGRATIONS)
185 .map_err(MetricsError::MigrationError)?;
186
187 let check: String = sql_query("PRAGMA quick_check;")
189 .get_result::<PragmaCheckResult>(&mut db)?
190 .result;
191 if check != "ok" {
192 return Err(MetricsError::QueryError(
193 diesel::result::Error::DatabaseError(
194 diesel::result::DatabaseErrorKind::Unknown,
195 Box::new(format!("database disk image is malformed: {check}")),
196 ),
197 ));
198 }
199
200 Ok(db)
201}
202
203fn setup_db_or_reset<P: AsRef<Path>>(path: P) -> Result<(SqliteConnection, bool)> {
208 let path = path.as_ref();
209 match setup_db(path) {
210 Ok(db) => Ok((db, false)),
211 Err(err) if err.is_malformed_db() => {
212 warn!(
213 "Database is malformed, removing and recreating: {}",
214 path.display()
215 );
216 remove_db_files(path);
217 setup_db(path).map(|db| (db, true))
218 }
219 Err(err) => Err(err),
220 }
221}
222enum RegisterType {
223 Counter,
224 Gauge,
225 Histogram,
226}
227
228enum Event {
229 Stop,
230 DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
231 IncrementCounter(Duration, Key, u64),
232 AbsoluteCounter(Duration, Key, u64),
233 UpdateGauge(Duration, Key, GaugeValue),
234 UpdateHistogram(Duration, Key, f64),
235 SetHousekeeping {
236 retention_period: Option<Duration>,
237 housekeeping_period: Option<Duration>,
238 record_limit: Option<usize>,
239 },
240 RequestSummaryFromSignpost {
241 signpost_key: String,
242 keys: Vec<String>,
243 tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
244 },
245}
246
247pub struct SqliteExporterHandle {
249 sender: SyncSender<Event>,
250}
251impl SqliteExporterHandle {
252 pub fn request_average_metrics(
254 &self,
255 from_signpost: &str,
256 with_keys: &[&str],
257 ) -> Result<HashMap<String, f64>> {
258 let (tx, rx) = tokio::sync::oneshot::channel();
259 self.sender
260 .send(Event::RequestSummaryFromSignpost {
261 signpost_key: from_signpost.to_string(),
262 keys: with_keys.iter().map(|s| s.to_string()).collect(),
263 tx,
264 })
265 .map_err(|_| MetricsError::ExporterUnavailable)?;
266 match rx.blocking_recv() {
267 Ok(metrics) => Ok(metrics?),
268 Err(_) => Err(MetricsError::ExporterUnavailable),
269 }
270 }
271}
272
273pub struct SqliteExporter {
275 thread: Option<JoinHandle<()>>,
276 sender: SyncSender<Event>,
277 send_error_throttle: Mutex<LogThrottle>,
278}
279struct InnerState {
280 db: SqliteConnection,
281 db_path: PathBuf,
282 last_housekeeping: Instant,
283 housekeeping: Option<Duration>,
284 retention: Option<Duration>,
285 record_limit: Option<usize>,
286 flush_duration: Duration,
287 last_flush: Instant,
288 last_values: HashMap<Key, f64>,
289 counters: HashMap<Key, u64>,
290 key_ids: HashMap<String, i64>,
291 queue: VecDeque<NewMetric>,
292 consecutive_flush_failures: u64,
293 last_reconnect: Option<Instant>,
294}
295impl InnerState {
296 fn new(flush_duration: Duration, db: SqliteConnection, db_path: PathBuf) -> Self {
297 InnerState {
298 db,
299 db_path,
300 last_housekeeping: Instant::now(),
301 housekeeping: None,
302 retention: None,
303 record_limit: None,
304 flush_duration,
305 last_flush: Instant::now(),
306 last_values: HashMap::new(),
307 counters: HashMap::new(),
308 key_ids: HashMap::new(),
309 queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
310 consecutive_flush_failures: 0,
311 last_reconnect: None,
312 }
313 }
314 fn set_housekeeping(
315 &mut self,
316 retention: Option<Duration>,
317 housekeeping_duration: Option<Duration>,
318 record_limit: Option<usize>,
319 ) {
320 self.retention = retention;
321 self.housekeeping = housekeeping_duration;
322 self.last_housekeeping = Instant::now();
323 self.record_limit = record_limit;
324 }
325 fn should_housekeep(&self) -> bool {
326 match self.housekeeping {
327 Some(duration) => self.last_housekeeping.elapsed() > duration,
328 None => false,
329 }
330 }
331 fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
332 SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
333 self.last_housekeeping = Instant::now();
334 Ok(())
335 }
336 fn should_flush(&self) -> bool {
337 if self.last_flush.elapsed() > self.flush_duration {
338 true
339 } else if self.queue.len() >= FLUSH_QUEUE_LIMIT {
340 debug!("Flushing due to queue size ({} items)", self.queue.len());
341 true
342 } else {
343 false
344 }
345 }
346 fn flush(&mut self) -> Result<(), diesel::result::Error> {
347 if self.queue.is_empty() {
348 self.last_flush = Instant::now();
349 return Ok(());
350 }
351 let (front, back) = self.queue.as_slices();
357 match Self::insert_metrics(&mut self.db, [front, back]) {
358 Ok(()) => {
359 self.queue.clear();
360 self.last_flush = Instant::now();
361 self.consecutive_flush_failures = 0;
362 Ok(())
363 }
364 Err(e) => {
365 self.consecutive_flush_failures += 1;
366 self.enforce_queue_cap();
368 if Self::is_connection_fatal(&e)
372 || self.consecutive_flush_failures >= RECONNECT_AFTER_FAILURES
373 {
374 self.reconnect();
375 }
376 Err(e)
377 }
378 }
379 }
380
381 fn insert_metrics<'a, S>(
385 db: &mut SqliteConnection,
386 slabs: S,
387 ) -> Result<(), diesel::result::Error>
388 where
389 S: IntoIterator<Item = &'a [NewMetric]>,
390 {
391 use crate::schema::metrics::dsl::metrics;
392 db.transaction::<_, diesel::result::Error, _>(|db| {
393 let chunk_size = INSERT_BATCH_SIZE.max(1);
394 for slab in slabs {
395 for chunk in slab.chunks(chunk_size) {
396 insert_into(metrics).values(chunk).execute(db)?;
397 }
398 }
399 Ok(())
400 })
401 }
402
403 fn enforce_queue_cap(&mut self) {
406 if self.queue.len() > QUEUE_HARD_LIMIT {
407 let overflow = self.queue.len() - QUEUE_HARD_LIMIT;
408 self.queue.drain(..overflow);
409 warn!(
410 "metrics-sqlite queue exceeded {} items while flushing kept failing, dropped {} oldest metrics",
411 QUEUE_HARD_LIMIT, overflow
412 );
413 }
414 }
415
416 fn is_connection_fatal(e: &diesel::result::Error) -> bool {
419 matches!(e, diesel::result::Error::BrokenTransactionManager)
420 }
421
422 fn reconnect(&mut self) {
425 if let Some(last) = self.last_reconnect {
426 if last.elapsed() < RECONNECT_BACKOFF {
427 return;
428 }
429 }
430 self.last_reconnect = Some(Instant::now());
431 warn!(
432 "metrics-sqlite database connection is broken, reconnecting to {}",
433 self.db_path.display()
434 );
435 match setup_db_or_reset(&self.db_path) {
436 Ok((db, was_reset)) => {
437 self.db = db;
438 self.key_ids.clear();
440 self.consecutive_flush_failures = 0;
441 if was_reset {
442 let dropped = self.queue.len();
447 if dropped > 0 {
448 warn!(
449 "metrics-sqlite database was recreated; dropping {dropped} queued metrics with stale key ids"
450 );
451 self.queue.clear();
452 }
453 }
454 info!("metrics-sqlite database connection re-established");
455 }
456 Err(e) => {
457 error!("metrics-sqlite failed to reconnect to database: {:?}", e);
458 }
459 }
460 }
461 fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
462 let metric_key_id = match self.key_ids.get(key) {
463 Some(key) => *key,
464 None => {
465 debug!("Looking up {}", key);
466 let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
467 self.key_ids.insert(key.to_string(), key_id);
468 key_id
469 }
470 };
471 let metric = NewMetric {
472 timestamp: timestamp.as_secs_f64(),
473 metric_key_id,
474 value: value as _,
475 };
476 self.queue.push_back(metric);
477 Ok(())
478 }
479
480 pub fn metrics_summary_for_signpost_and_keys(
483 &mut self,
484 signpost: String,
485 metrics: Vec<String>,
486 ) -> Result<HashMap<String, f64>> {
487 query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
488 }
489}
490
491fn run_worker(
492 db: SqliteConnection,
493 db_path: PathBuf,
494 receiver: Receiver<Event>,
495 flush_duration: Duration,
496) -> JoinHandle<()> {
497 thread::Builder::new()
498 .name("metrics-sqlite: worker".to_string())
499 .spawn(move || {
500 let mut state = InnerState::new(flush_duration, db, db_path);
501 let mut flush_error_throttle = LogThrottle::new(ERROR_LOG_INTERVAL);
502 let mut queue_error_throttle = LogThrottle::new(ERROR_LOG_INTERVAL);
503 info!("SQLite worker started");
504 loop {
505 let time_based_flush = state.last_flush.elapsed() >= flush_duration;
507
508 let mut should_flush = false;
509 let mut should_exit = false;
510 match receiver.recv_timeout(flush_duration) {
511 Ok(Event::Stop) => {
512 info!("Stopping SQLiteExporter worker, flushing & exiting");
513 should_flush = true;
514 should_exit = true;
515 }
516 Ok(Event::SetHousekeeping {
517 retention_period,
518 housekeeping_period,
519 record_limit,
520 }) => {
521 state.set_housekeeping(retention_period, housekeeping_period, record_limit);
522 }
523 Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
524 info!("Describing key {:?}", key);
525 if let Err(e) = MetricKey::create_or_update(
526 key.as_str(),
527 unit,
528 Some(desc.as_ref()),
529 &mut state.db,
530 ) {
531 error!("Failed to create key entry: {:?}", e);
532 }
533 }
534 Ok(Event::IncrementCounter(timestamp, key, value)) => {
535 let key_name = key.name();
536 let entry = state.counters.entry(key.clone()).or_insert(0);
537 let value = {
538 *entry += value;
539 *entry
540 };
541 if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
542 queue_error_throttle.log_if_due(|suppressed| {
543 if suppressed > 0 {
544 error!(
545 "Error queueing metric: {:?} ({} similar errors suppressed in the last {}s)",
546 e,
547 suppressed,
548 ERROR_LOG_INTERVAL.as_secs()
549 );
550 } else {
551 error!("Error queueing metric: {:?}", e);
552 }
553 });
554 }
555 should_flush = state.should_flush();
556 }
557 Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
558 let key_name = key.name();
559 state.counters.insert(key.clone(), value);
560 if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
561 queue_error_throttle.log_if_due(|suppressed| {
562 if suppressed > 0 {
563 error!(
564 "Error queueing metric: {:?} ({} similar errors suppressed in the last {}s)",
565 e,
566 suppressed,
567 ERROR_LOG_INTERVAL.as_secs()
568 );
569 } else {
570 error!("Error queueing metric: {:?}", e);
571 }
572 });
573 }
574 should_flush = state.should_flush();
575 }
576 Ok(Event::UpdateGauge(timestamp, key, value)) => {
577 let key_name = key.name();
578 let entry = state.last_values.entry(key.clone()).or_insert(0.0);
579 let value = match value {
580 GaugeValue::Absolute(v) => {
581 *entry = v;
582 *entry
583 }
584 GaugeValue::Increment(v) => {
585 *entry += v;
586 *entry
587 }
588 GaugeValue::Decrement(v) => {
589 *entry -= v;
590 *entry
591 }
592 };
593 if let Err(e) = state.queue_metric(timestamp, key_name, value) {
594 queue_error_throttle.log_if_due(|suppressed| {
595 if suppressed > 0 {
596 error!(
597 "Error queueing metric: {:?} ({} similar errors suppressed in the last {}s)",
598 e,
599 suppressed,
600 ERROR_LOG_INTERVAL.as_secs()
601 );
602 } else {
603 error!("Error queueing metric: {:?}", e);
604 }
605 });
606 }
607 should_flush = state.should_flush();
608 }
609 Ok(Event::UpdateHistogram(timestamp, key, value)) => {
610 let key_name = key.name();
611 if let Err(e) = state.queue_metric(timestamp, key_name, value) {
612 queue_error_throttle.log_if_due(|suppressed| {
613 if suppressed > 0 {
614 error!(
615 "Error queueing metric: {:?} ({} similar errors suppressed in the last {}s)",
616 e,
617 suppressed,
618 ERROR_LOG_INTERVAL.as_secs()
619 );
620 } else {
621 error!("Error queueing metric: {:?}", e);
622 }
623 });
624 }
625 should_flush = state.should_flush();
626 }
627 Ok(Event::RequestSummaryFromSignpost {
628 signpost_key,
629 keys,
630 tx,
631 }) => {
632 match state.flush() {
633 Ok(()) => match state
634 .metrics_summary_for_signpost_and_keys(signpost_key, keys)
635 {
636 Ok(metrics) => {
637 if tx.send(Ok(metrics)).is_err() {
638 error!(
639 "Failed to respond with metrics results, discarding"
640 );
641 }
642 }
643 Err(e) => {
644 if let Err(e) = tx.send(Err(e)) {
645 error!(
646 "Failed to respond with metrics error result, discarding: {e:?}"
647 );
648 }
649 }
650 },
651 Err(e) => {
652 let err = MetricsError::from(e);
653 error!(
654 "Failed to flush pending metrics before summary request: {err:?}"
655 );
656 if let Err(send_err) = tx.send(Err(err)) {
657 error!(
658 "Failed to respond with metrics flush error result, discarding: {send_err:?}"
659 );
660 }
661 }
662 }
663 }
664 Err(RecvTimeoutError::Timeout) => {
665 should_flush = true;
666 }
667 Err(RecvTimeoutError::Disconnected) => {
668 warn!("SQLiteExporter channel disconnected, exiting worker");
669 should_flush = true;
670 should_exit = true;
671 }
672 }
673
674 if time_based_flush || should_flush {
676 if time_based_flush {
677 debug!("Flushing due to elapsed time ({}s)", flush_duration.as_secs());
678 }
679 if let Err(e) = state.flush() {
680 if let Some(suppressed) = flush_error_throttle.allow() {
681 if suppressed > 0 {
682 error!(
683 "Error flushing metrics: {} ({} similar errors suppressed in the last {}s)",
684 e,
685 suppressed,
686 ERROR_LOG_INTERVAL.as_secs()
687 );
688 } else {
689 error!("Error flushing metrics: {}", e);
690 }
691 }
692 }
693 }
694 if state.should_housekeep() {
695 if let Err(e) = state.housekeep() {
696 error!("Failed running house keeping: {:?}", e);
697 }
698 }
699 if should_exit {
700 break;
701 }
702 }
703 })
704 .unwrap()
705}
706
707impl SqliteExporter {
708 pub fn new<P: AsRef<Path>>(
714 flush_interval: Duration,
715 keep_duration: Option<Duration>,
716 path: P,
717 ) -> Result<Self> {
718 let path = path.as_ref().to_path_buf();
719 let (mut db, _was_reset) = setup_db_or_reset(&path)?;
720 Self::housekeeping(&mut db, keep_duration, None, true);
721 let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
722 let thread = run_worker(db, path, receiver, flush_interval);
723 let exporter = SqliteExporter {
724 thread: Some(thread),
725 sender,
726 send_error_throttle: Mutex::new(LogThrottle::new(ERROR_LOG_INTERVAL)),
727 };
728 Ok(exporter)
729 }
730
731 pub fn set_periodic_housekeeping(
736 &self,
737 periodic_duration: Option<Duration>,
738 retention: Option<Duration>,
739 record_limit: Option<usize>,
740 ) {
741 if let Err(e) = self.sender.send(Event::SetHousekeeping {
742 retention_period: retention,
743 housekeeping_period: periodic_duration,
744 record_limit,
745 }) {
746 error!("Failed to set house keeping settings: {:?}", e);
747 }
748 }
749
750 fn housekeeping(
754 db: &mut SqliteConnection,
755 keep_duration: Option<Duration>,
756 record_limit: Option<usize>,
757 vacuum: bool,
758 ) {
759 use crate::schema::metrics::dsl::*;
760 use diesel::dsl::count;
761 if let Some(keep_duration) = keep_duration {
762 match SystemTime::UNIX_EPOCH.elapsed() {
763 Ok(now) => {
764 let cutoff = now - keep_duration;
765 trace!("Deleting data {}s old", keep_duration.as_secs());
766 if let Err(e) =
767 diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
768 .execute(db)
769 {
770 error!("Failed to remove old metrics data: {}", e);
771 }
772 if vacuum {
773 if let Err(e) = sql_query("VACUUM").execute(db) {
774 error!("Failed to vacuum SQLite DB: {:?}", e);
775 }
776 }
777 }
778 Err(e) => {
779 error!(
780 "System time error, skipping metrics-sqlite housekeeping: {}",
781 e
782 );
783 }
784 }
785 }
786 if let Some(record_limit) = record_limit {
787 trace!("Checking for records over {} limit", record_limit);
788 match metrics.select(count(id)).first::<i64>(db) {
789 Ok(records) => {
790 let records = records as usize;
791 if records > record_limit {
792 let excess = records - record_limit + (record_limit / 4); trace!(
794 "Exceeded limit! {} > {}, deleting {} oldest",
795 records, record_limit, excess
796 );
797 let query = format!(
798 "DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});"
799 );
800 if let Err(e) = sql_query(query).execute(db) {
801 error!("Failed to delete excessive records: {:?}", e);
802 }
803 }
804 }
805 Err(e) => {
806 error!("Failed to get record count: {:?}", e);
807 }
808 }
809 }
810 }
811
812 fn log_send_failure(&self, context: &str, err: &dyn std::fmt::Debug) {
815 if let Ok(mut throttle) = self.send_error_throttle.lock() {
816 if let Some(suppressed) = throttle.allow() {
817 if suppressed > 0 {
818 error!(
819 "Error sending metric {} to SQLite worker: {:?} ({} similar errors suppressed in the last {}s)",
820 context,
821 err,
822 suppressed,
823 ERROR_LOG_INTERVAL.as_secs()
824 );
825 } else {
826 error!(
827 "Error sending metric {} to SQLite worker: {:?}",
828 context, err
829 );
830 }
831 }
832 }
833 }
834
835 pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
837 let handle = SqliteExporterHandle {
838 sender: self.sender.clone(),
839 };
840 metrics::set_global_recorder(self)?;
841 Ok(handle)
842 }
843}
844impl Drop for SqliteExporter {
845 fn drop(&mut self) {
846 let _ = self.sender.send(Event::Stop);
847 let _ = self.thread.take().unwrap().join();
848 }
849}
850
851#[cfg(test)]
852mod tests {
853 use crate::{
854 InnerState, LogThrottle, NewMetric, QUEUE_HARD_LIMIT, SqliteExporter, setup_db_or_reset,
855 };
856 use std::time::{Duration, Instant};
857
858 fn test_state() -> (InnerState, tempfile::TempDir) {
859 let dir = tempfile::tempdir().unwrap();
860 let path = dir.path().join("metrics.db");
861 let (db, _was_reset) = setup_db_or_reset(&path).unwrap();
862 (InnerState::new(Duration::from_secs(5), db, path), dir)
863 }
864
865 #[test]
866 fn enforce_queue_cap_drops_oldest_when_over_limit() {
867 let (mut state, _dir) = test_state();
868 let total = QUEUE_HARD_LIMIT + 250;
871 for i in 0..total {
872 state.queue.push_back(NewMetric {
873 timestamp: 0.0,
874 metric_key_id: 1,
875 value: i as f64,
876 });
877 }
878 state.enforce_queue_cap();
879 assert_eq!(state.queue.len(), QUEUE_HARD_LIMIT);
881 assert_eq!(state.queue.front().unwrap().value, 250.0);
882 assert_eq!(state.queue.back().unwrap().value, (total - 1) as f64);
883 state.enforce_queue_cap();
885 assert_eq!(state.queue.len(), QUEUE_HARD_LIMIT);
886 }
887
888 #[test]
889 fn failed_flush_leaves_queue_intact() {
890 use diesel::connection::SimpleConnection;
891 let (mut state, _dir) = test_state();
892 state
896 .db
897 .batch_execute("DROP TABLE metrics")
898 .expect("setup: drop metrics table");
899 for i in 0..5_000 {
900 state.queue.push_back(NewMetric {
901 timestamp: 0.0,
902 metric_key_id: 1,
903 value: i as f64,
904 });
905 }
906 let before = state.queue.len();
907 assert!(state.flush().is_err(), "flush should fail");
908 assert_eq!(state.queue.len(), before);
910 assert_eq!(state.queue.front().unwrap().value, 0.0);
911 assert_eq!(state.queue.back().unwrap().value, 4_999.0);
912 assert_eq!(state.consecutive_flush_failures, 1);
913 }
914
915 #[test]
916 fn reconnect_drops_queue_when_database_is_recreated() {
917 use crate::setup_db;
918 use diesel::connection::SimpleConnection;
919 use std::io::{Seek, SeekFrom, Write};
920 let (mut state, _dir) = test_state();
921 for i in 0..100 {
922 state.queue.push_back(NewMetric {
923 timestamp: 0.0,
924 metric_key_id: 1,
925 value: i as f64,
926 });
927 }
928 state
931 .db
932 .batch_execute("PRAGMA wal_checkpoint(TRUNCATE)")
933 .expect("setup: wal checkpoint");
934 state.db = setup_db(":memory:").expect("setup: in-memory placeholder");
941 let mut f = std::fs::OpenOptions::new()
946 .write(true)
947 .open(&state.db_path)
948 .expect("setup: open db file");
949 f.seek(SeekFrom::Start(100))
950 .expect("setup: seek past header");
951 f.write_all(&[0xffu8; 16 * 1024])
952 .expect("setup: write garbage pages");
953 f.sync_all().expect("setup: sync garbage to disk");
954 drop(f);
955
956 state.reconnect();
957
958 assert!(
961 state.queue.is_empty(),
962 "queue should be cleared after reset"
963 );
964 assert_eq!(state.consecutive_flush_failures, 0);
965 assert!(state.flush().is_ok());
967 }
968
969 #[test]
970 fn reconnect_rebuilds_connection_with_backoff() {
971 let (mut state, _dir) = test_state();
972 state.consecutive_flush_failures = 5;
973 state.reconnect();
974 assert_eq!(state.consecutive_flush_failures, 0);
977 let first_attempt = state.last_reconnect.expect("reconnect should run");
978 assert!(state.flush().is_ok());
980
981 state.consecutive_flush_failures = 5;
984 state.reconnect();
985 assert_eq!(state.consecutive_flush_failures, 5);
986 assert_eq!(state.last_reconnect, Some(first_attempt));
987 }
988
989 #[test]
990 fn log_throttle_suppresses_and_reports_count() {
991 let mut throttle = LogThrottle::new(Duration::from_millis(50));
992 assert_eq!(throttle.allow(), Some(0));
994 for _ in 0..7 {
996 assert_eq!(throttle.allow(), None);
997 }
998 std::thread::sleep(Duration::from_millis(60));
1001 assert_eq!(throttle.allow(), Some(7));
1002 std::thread::sleep(Duration::from_millis(60));
1004 assert_eq!(throttle.allow(), Some(0));
1005 }
1006
1007 #[test]
1008 fn test_threading() {
1009 use std::thread;
1010 SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
1011 .unwrap()
1012 .install()
1013 .unwrap();
1014 let joins: Vec<thread::JoinHandle<()>> = (0..5)
1015 .map(|_| {
1016 thread::spawn(move || {
1017 let start = Instant::now();
1018 loop {
1019 metrics::gauge!("rate").set(1.0);
1020 metrics::counter!("hits").increment(1);
1021 metrics::histogram!("histogram").record(5.0);
1022 if start.elapsed().as_secs() >= 5 {
1023 break;
1024 }
1025 }
1026 })
1027 })
1028 .collect();
1029 for j in joins {
1030 j.join().unwrap();
1031 }
1032 }
1033}