1#![deny(missing_docs)]
2#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Arc;
17use std::{
18 collections::{HashMap, VecDeque},
19 path::{Path, PathBuf},
20 sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21 thread::{self, JoinHandle},
22 time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29const SQLITE_DEFAULT_MAX_VARIABLES: usize = 999;
30const METRIC_FIELDS_PER_ROW: usize = 3;
31const INSERT_BATCH_SIZE: usize = SQLITE_DEFAULT_MAX_VARIABLES / METRIC_FIELDS_PER_ROW;
32
33#[derive(Debug, Error)]
35pub enum MetricsError {
36 #[error("Database error: {0}")]
38 DbConnectionError(#[from] ConnectionError),
39 #[error("Migration error: {0}")]
41 MigrationError(Box<dyn std::error::Error + Send + Sync>),
42 #[error("Error querying DB: {0}")]
44 QueryError(#[from] diesel::result::Error),
45 #[error("Invalid database path")]
47 InvalidDatabasePath,
48 #[cfg(feature = "csv")]
50 #[error("IO Error: {0}")]
51 IoError(#[from] std::io::Error),
52 #[cfg(feature = "csv")]
54 #[error("CSV Error: {0}")]
55 CsvError(#[from] csv::Error),
56 #[error("Database has no metrics stored in it")]
58 EmptyDatabase,
59 #[error("Metric key {0} not found in database")]
61 KeyNotFound(String),
62 #[error("Exporter task has been stopped or crashed")]
64 ExporterUnavailable,
65 #[error("Session for signpost `{0}` has zero duration")]
67 ZeroLengthSession(String),
68 #[error("No metrics recorded for `{0}` in requested session")]
70 NoMetricsForKey(String),
71}
72
73impl MetricsError {
74 fn is_malformed_db(&self) -> bool {
76 self.to_string().contains("malformed")
77 }
78}
79
80pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
82
83mod metrics_db;
84mod models;
85mod recorder;
86mod schema;
87
88use crate::metrics_db::query;
89use crate::recorder::Handle;
90pub use metrics_db::{MetricsDb, Session};
91pub use models::{Metric, MetricKey, NewMetric};
92
93pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
94
95#[derive(QueryableByName)]
96struct PragmaCheckResult {
97 #[diesel(sql_type = diesel::sql_types::Text)]
98 #[diesel(column_name = quick_check)]
99 result: String,
100}
101
102fn remove_db_files(path: &Path) {
104 let db_path = PathBuf::from(path);
105 for suffix in &["", "-wal", "-shm"] {
106 let mut file_path = db_path.clone().into_os_string();
107 file_path.push(suffix);
108 let file_path = PathBuf::from(file_path);
109 if file_path.exists() {
110 if let Err(e) = std::fs::remove_file(&file_path) {
111 error!("Failed to remove {}: {}", file_path.display(), e);
112 } else {
113 info!("Removed corrupt database file: {}", file_path.display());
114 }
115 }
116 }
117}
118
119fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
120 let url = path
121 .as_ref()
122 .to_str()
123 .ok_or(MetricsError::InvalidDatabasePath)?;
124 let mut db = SqliteConnection::establish(url)?;
125
126 sql_query("PRAGMA journal_mode=WAL;").execute(&mut db)?;
128
129 sql_query("PRAGMA busy_timeout = 5000;").execute(&mut db)?;
131
132 db.run_pending_migrations(MIGRATIONS)
133 .map_err(MetricsError::MigrationError)?;
134
135 let check: String = sql_query("PRAGMA quick_check;")
137 .get_result::<PragmaCheckResult>(&mut db)?
138 .result;
139 if check != "ok" {
140 return Err(MetricsError::QueryError(
141 diesel::result::Error::DatabaseError(
142 diesel::result::DatabaseErrorKind::Unknown,
143 Box::new(format!("database disk image is malformed: {check}")),
144 ),
145 ));
146 }
147
148 Ok(db)
149}
150
151fn setup_db_or_reset<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
153 let path = path.as_ref();
154 match setup_db(path) {
155 Ok(db) => Ok(db),
156 Err(err) if err.is_malformed_db() => {
157 warn!(
158 "Database is malformed, removing and recreating: {}",
159 path.display()
160 );
161 remove_db_files(path);
162 setup_db(path)
163 }
164 Err(err) => Err(err),
165 }
166}
167enum RegisterType {
168 Counter,
169 Gauge,
170 Histogram,
171}
172
173enum Event {
174 Stop,
175 DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
176 RegisterKey(RegisterType, Key, Arc<Handle>),
177 IncrementCounter(Duration, Key, u64),
178 AbsoluteCounter(Duration, Key, u64),
179 UpdateGauge(Duration, Key, GaugeValue),
180 UpdateHistogram(Duration, Key, f64),
181 SetHousekeeping {
182 retention_period: Option<Duration>,
183 housekeeping_period: Option<Duration>,
184 record_limit: Option<usize>,
185 },
186 RequestSummaryFromSignpost {
187 signpost_key: String,
188 keys: Vec<String>,
189 tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
190 },
191}
192
193pub struct SqliteExporterHandle {
195 sender: SyncSender<Event>,
196}
197impl SqliteExporterHandle {
198 pub fn request_average_metrics(
200 &self,
201 from_signpost: &str,
202 with_keys: &[&str],
203 ) -> Result<HashMap<String, f64>> {
204 let (tx, rx) = tokio::sync::oneshot::channel();
205 self.sender
206 .send(Event::RequestSummaryFromSignpost {
207 signpost_key: from_signpost.to_string(),
208 keys: with_keys.iter().map(|s| s.to_string()).collect(),
209 tx,
210 })
211 .map_err(|_| MetricsError::ExporterUnavailable)?;
212 match rx.blocking_recv() {
213 Ok(metrics) => Ok(metrics?),
214 Err(_) => Err(MetricsError::ExporterUnavailable),
215 }
216 }
217}
218
219pub struct SqliteExporter {
221 thread: Option<JoinHandle<()>>,
222 sender: SyncSender<Event>,
223}
224struct InnerState {
225 db: SqliteConnection,
226 last_housekeeping: Instant,
227 housekeeping: Option<Duration>,
228 retention: Option<Duration>,
229 record_limit: Option<usize>,
230 flush_duration: Duration,
231 last_flush: Instant,
232 last_values: HashMap<Key, f64>,
233 counters: HashMap<Key, u64>,
234 key_ids: HashMap<String, i64>,
235 queue: VecDeque<NewMetric>,
236}
237impl InnerState {
238 fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
239 InnerState {
240 db,
241 last_housekeeping: Instant::now(),
242 housekeeping: None,
243 retention: None,
244 record_limit: None,
245 flush_duration,
246 last_flush: Instant::now(),
247 last_values: HashMap::new(),
248 counters: HashMap::new(),
249 key_ids: HashMap::new(),
250 queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
251 }
252 }
253 fn set_housekeeping(
254 &mut self,
255 retention: Option<Duration>,
256 housekeeping_duration: Option<Duration>,
257 record_limit: Option<usize>,
258 ) {
259 self.retention = retention;
260 self.housekeeping = housekeeping_duration;
261 self.last_housekeeping = Instant::now();
262 self.record_limit = record_limit;
263 }
264 fn should_housekeep(&self) -> bool {
265 match self.housekeeping {
266 Some(duration) => self.last_housekeeping.elapsed() > duration,
267 None => false,
268 }
269 }
270 fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
271 SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
272 self.last_housekeeping = Instant::now();
273 Ok(())
274 }
275 fn should_flush(&self) -> bool {
276 if self.last_flush.elapsed() > self.flush_duration {
277 true
278 } else if self.queue.len() >= FLUSH_QUEUE_LIMIT {
279 debug!("Flushing due to queue size ({} items)", self.queue.len());
280 true
281 } else {
282 false
283 }
284 }
285 fn flush(&mut self) -> Result<(), diesel::result::Error> {
286 use crate::schema::metrics::dsl::metrics;
287 if self.queue.is_empty() {
288 self.last_flush = Instant::now();
289 return Ok(());
290 }
291 let drain_buffer: Vec<NewMetric> = self.queue.drain(..).collect();
292 let db = &mut self.db;
293 let transaction_result = db.transaction::<_, diesel::result::Error, _>(|db| {
294 let chunk_size = INSERT_BATCH_SIZE.max(1);
295 for chunk in drain_buffer.chunks(chunk_size) {
296 insert_into(metrics).values(chunk).execute(db)?;
297 }
298 Ok(())
299 });
300 match transaction_result {
301 Ok(()) => {
302 self.last_flush = Instant::now();
303 Ok(())
304 }
305 Err(e) => {
306 self.queue.extend(drain_buffer);
307 Err(e)
308 }
309 }
310 }
311 fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
312 let metric_key_id = match self.key_ids.get(key) {
313 Some(key) => *key,
314 None => {
315 debug!("Looking up {}", key);
316 let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
317 self.key_ids.insert(key.to_string(), key_id);
318 key_id
319 }
320 };
321 let metric = NewMetric {
322 timestamp: timestamp.as_secs_f64(),
323 metric_key_id,
324 value: value as _,
325 };
326 self.queue.push_back(metric);
327 Ok(())
328 }
329
330 pub fn metrics_summary_for_signpost_and_keys(
333 &mut self,
334 signpost: String,
335 metrics: Vec<String>,
336 ) -> Result<HashMap<String, f64>> {
337 query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
338 }
339}
340
341fn run_worker(
342 db: SqliteConnection,
343 receiver: Receiver<Event>,
344 flush_duration: Duration,
345) -> JoinHandle<()> {
346 thread::Builder::new()
347 .name("metrics-sqlite: worker".to_string())
348 .spawn(move || {
349 let mut state = InnerState::new(flush_duration, db);
350 info!("SQLite worker started");
351 loop {
352 let time_based_flush = state.last_flush.elapsed() >= flush_duration;
354
355 let (should_flush, should_exit) = match receiver.recv_timeout(flush_duration) {
356 Ok(Event::Stop) => {
357 info!("Stopping SQLiteExporter worker, flushing & exiting");
358 (true, true)
359 }
360 Ok(Event::SetHousekeeping {
361 retention_period,
362 housekeeping_period,
363 record_limit,
364 }) => {
365 state.set_housekeeping(retention_period, housekeeping_period, record_limit);
366 (false, false)
367 }
368 Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
369 info!("Describing key {:?}", key);
370 if let Err(e) = MetricKey::create_or_update(
371 key.as_str(),
372 unit,
373 Some(desc.as_ref()),
374 &mut state.db,
375 ) {
376 error!("Failed to create key entry: {:?}", e);
377 }
378 (false, false)
379 }
380 Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
381 (false, false)
383 }
384 Ok(Event::IncrementCounter(timestamp, key, value)) => {
385 let key_name = key.name();
386 let entry = state.counters.entry(key.clone()).or_insert(0);
387 let value = {
388 *entry += value;
389 *entry
390 };
391 if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
392 error!("Error queueing metric: {:?}", e);
393 }
394
395 (state.should_flush(), false)
396 }
397 Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
398 let key_name = key.name();
399 state.counters.insert(key.clone(), value);
400 if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
401 error!("Error queueing metric: {:?}", e);
402 }
403 (state.should_flush(), false)
404 }
405 Ok(Event::UpdateGauge(timestamp, key, value)) => {
406 let key_name = key.name();
407 let entry = state.last_values.entry(key.clone()).or_insert(0.0);
408 let value = match value {
409 GaugeValue::Absolute(v) => {
410 *entry = v;
411 *entry
412 }
413 GaugeValue::Increment(v) => {
414 *entry += v;
415 *entry
416 }
417 GaugeValue::Decrement(v) => {
418 *entry -= v;
419 *entry
420 }
421 };
422 if let Err(e) = state.queue_metric(timestamp, key_name, value) {
423 error!("Error queueing metric: {:?}", e);
424 }
425 (state.should_flush(), false)
426 }
427 Ok(Event::UpdateHistogram(timestamp, key, value)) => {
428 let key_name = key.name();
429 if let Err(e) = state.queue_metric(timestamp, key_name, value) {
430 error!("Error queueing metric: {:?}", e);
431 }
432
433 (state.should_flush(), false)
434 }
435 Ok(Event::RequestSummaryFromSignpost {
436 signpost_key,
437 keys,
438 tx,
439 }) => {
440 match state.flush() {
441 Ok(()) => match state
442 .metrics_summary_for_signpost_and_keys(signpost_key, keys)
443 {
444 Ok(metrics) => {
445 if tx.send(Ok(metrics)).is_err() {
446 error!(
447 "Failed to respond with metrics results, discarding"
448 );
449 }
450 }
451 Err(e) => {
452 if let Err(e) = tx.send(Err(e)) {
453 error!(
454 "Failed to respond with metrics error result, discarding: {e:?}"
455 );
456 }
457 }
458 },
459 Err(e) => {
460 let err = MetricsError::from(e);
461 error!(
462 "Failed to flush pending metrics before summary request: {err:?}"
463 );
464 if let Err(send_err) = tx.send(Err(err)) {
465 error!(
466 "Failed to respond with metrics flush error result, discarding: {send_err:?}"
467 );
468 }
469 }
470 }
471 (false, false)
472 }
473 Err(RecvTimeoutError::Timeout) => {
474 (true, false)
475 }
476 Err(RecvTimeoutError::Disconnected) => {
477 warn!("SQLiteExporter channel disconnected, exiting worker");
478 (true, true)
479 }
480 };
481
482 if time_based_flush || should_flush {
484 if time_based_flush {
485 debug!("Flushing due to elapsed time ({}s)", flush_duration.as_secs());
486 }
487 if let Err(e) = state.flush() {
488 error!("Error flushing metrics: {}", e);
489 }
490 }
491 if state.should_housekeep() {
492 if let Err(e) = state.housekeep() {
493 error!("Failed running house keeping: {:?}", e);
494 }
495 }
496 if should_exit {
497 break;
498 }
499 }
500 })
501 .unwrap()
502}
503
504impl SqliteExporter {
505 pub fn new<P: AsRef<Path>>(
511 flush_interval: Duration,
512 keep_duration: Option<Duration>,
513 path: P,
514 ) -> Result<Self> {
515 let mut db = setup_db_or_reset(path)?;
516 Self::housekeeping(&mut db, keep_duration, None, true);
517 let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
518 let thread = run_worker(db, receiver, flush_interval);
519 let exporter = SqliteExporter {
520 thread: Some(thread),
521 sender,
522 };
523 Ok(exporter)
524 }
525
526 pub fn set_periodic_housekeeping(
531 &self,
532 periodic_duration: Option<Duration>,
533 retention: Option<Duration>,
534 record_limit: Option<usize>,
535 ) {
536 if let Err(e) = self.sender.send(Event::SetHousekeeping {
537 retention_period: retention,
538 housekeeping_period: periodic_duration,
539 record_limit,
540 }) {
541 error!("Failed to set house keeping settings: {:?}", e);
542 }
543 }
544
545 fn housekeeping(
549 db: &mut SqliteConnection,
550 keep_duration: Option<Duration>,
551 record_limit: Option<usize>,
552 vacuum: bool,
553 ) {
554 use crate::schema::metrics::dsl::*;
555 use diesel::dsl::count;
556 if let Some(keep_duration) = keep_duration {
557 match SystemTime::UNIX_EPOCH.elapsed() {
558 Ok(now) => {
559 let cutoff = now - keep_duration;
560 trace!("Deleting data {}s old", keep_duration.as_secs());
561 if let Err(e) =
562 diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
563 .execute(db)
564 {
565 error!("Failed to remove old metrics data: {}", e);
566 }
567 if vacuum {
568 if let Err(e) = sql_query("VACUUM").execute(db) {
569 error!("Failed to vacuum SQLite DB: {:?}", e);
570 }
571 }
572 }
573 Err(e) => {
574 error!(
575 "System time error, skipping metrics-sqlite housekeeping: {}",
576 e
577 );
578 }
579 }
580 }
581 if let Some(record_limit) = record_limit {
582 trace!("Checking for records over {} limit", record_limit);
583 match metrics.select(count(id)).first::<i64>(db) {
584 Ok(records) => {
585 let records = records as usize;
586 if records > record_limit {
587 let excess = records - record_limit + (record_limit / 4); trace!(
589 "Exceeded limit! {} > {}, deleting {} oldest",
590 records,
591 record_limit,
592 excess
593 );
594 let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});");
595 if let Err(e) = sql_query(query).execute(db) {
596 error!("Failed to delete excessive records: {:?}", e);
597 }
598 }
599 }
600 Err(e) => {
601 error!("Failed to get record count: {:?}", e);
602 }
603 }
604 }
605 }
606
607 pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
609 let handle = SqliteExporterHandle {
610 sender: self.sender.clone(),
611 };
612 metrics::set_global_recorder(self)?;
613 Ok(handle)
614 }
615}
616impl Drop for SqliteExporter {
617 fn drop(&mut self) {
618 let _ = self.sender.send(Event::Stop);
619 let _ = self.thread.take().unwrap().join();
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use crate::SqliteExporter;
626 use std::time::{Duration, Instant};
627
628 #[test]
629 fn test_threading() {
630 use std::thread;
631 SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
632 .unwrap()
633 .install()
634 .unwrap();
635 let joins: Vec<thread::JoinHandle<()>> = (0..5)
636 .map(|_| {
637 thread::spawn(move || {
638 let start = Instant::now();
639 loop {
640 metrics::gauge!("rate").set(1.0);
641 metrics::counter!("hits").increment(1);
642 metrics::histogram!("histogram").record(5.0);
643 if start.elapsed().as_secs() >= 5 {
644 break;
645 }
646 }
647 })
648 })
649 .collect();
650 for j in joins {
651 j.join().unwrap();
652 }
653 }
654}