1#![deny(missing_docs)]
2#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Arc;
17use std::{
18 collections::{HashMap, VecDeque},
19 path::{Path, PathBuf},
20 sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21 thread::{self, JoinHandle},
22 time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29const SQLITE_DEFAULT_MAX_VARIABLES: usize = 999;
30const METRIC_FIELDS_PER_ROW: usize = 3;
31const INSERT_BATCH_SIZE: usize = SQLITE_DEFAULT_MAX_VARIABLES / METRIC_FIELDS_PER_ROW;
32
33#[derive(Debug, Error)]
35pub enum MetricsError {
36 #[error("Database error: {0}")]
38 DbConnectionError(#[from] ConnectionError),
39 #[error("Migration error: {0}")]
41 MigrationError(Box<dyn std::error::Error + Send + Sync>),
42 #[error("Error querying DB: {0}")]
44 QueryError(#[from] diesel::result::Error),
45 #[error("Invalid database path")]
47 InvalidDatabasePath,
48 #[cfg(feature = "csv")]
50 #[error("IO Error: {0}")]
51 IoError(#[from] std::io::Error),
52 #[cfg(feature = "csv")]
54 #[error("CSV Error: {0}")]
55 CsvError(#[from] csv::Error),
56 #[error("Database has no metrics stored in it")]
58 EmptyDatabase,
59 #[error("Metric key {0} not found in database")]
61 KeyNotFound(String),
62 #[error("Exporter task has been stopped or crashed")]
64 ExporterUnavailable,
65 #[error("Session for signpost `{0}` has zero duration")]
67 ZeroLengthSession(String),
68 #[error("No metrics recorded for `{0}` in requested session")]
70 NoMetricsForKey(String),
71}
72
73impl MetricsError {
74 fn is_malformed_db(&self) -> bool {
76 self.to_string().contains("malformed")
77 }
78}
79
80pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
82
83mod metrics_db;
84mod models;
85mod recorder;
86mod schema;
87
88use crate::metrics_db::query;
89use crate::recorder::Handle;
90pub use metrics_db::{MetricsDb, Session};
91pub use models::{Metric, MetricKey, NewMetric};
92
93pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
94
95#[derive(QueryableByName)]
96struct PragmaCheckResult {
97 #[diesel(sql_type = diesel::sql_types::Text)]
98 #[diesel(column_name = quick_check)]
99 result: String,
100}
101
102fn remove_db_files(path: &Path) {
104 let db_path = PathBuf::from(path);
105 for suffix in &["", "-wal", "-shm"] {
106 let mut file_path = db_path.clone().into_os_string();
107 file_path.push(suffix);
108 let file_path = PathBuf::from(file_path);
109 if file_path.exists() {
110 if let Err(e) = std::fs::remove_file(&file_path) {
111 error!("Failed to remove {}: {}", file_path.display(), e);
112 } else {
113 info!("Removed corrupt database file: {}", file_path.display());
114 }
115 }
116 }
117}
118
119fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
120 let url = path
121 .as_ref()
122 .to_str()
123 .ok_or(MetricsError::InvalidDatabasePath)?;
124 let mut db = SqliteConnection::establish(url)?;
125
126 sql_query("PRAGMA journal_mode=WAL;").execute(&mut db)?;
128
129 sql_query("PRAGMA busy_timeout = 5000;").execute(&mut db)?;
131
132 db.run_pending_migrations(MIGRATIONS)
133 .map_err(MetricsError::MigrationError)?;
134
135 let check: String = sql_query("PRAGMA quick_check;")
137 .get_result::<PragmaCheckResult>(&mut db)?
138 .result;
139 if check != "ok" {
140 return Err(MetricsError::QueryError(
141 diesel::result::Error::DatabaseError(
142 diesel::result::DatabaseErrorKind::Unknown,
143 Box::new(format!("database disk image is malformed: {check}")),
144 ),
145 ));
146 }
147
148 Ok(db)
149}
150
151fn setup_db_or_reset<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
153 let path = path.as_ref();
154 match setup_db(path) {
155 Ok(db) => Ok(db),
156 Err(err) if err.is_malformed_db() => {
157 warn!(
158 "Database is malformed, removing and recreating: {}",
159 path.display()
160 );
161 remove_db_files(path);
162 setup_db(path)
163 }
164 Err(err) => Err(err),
165 }
166}
167enum RegisterType {
168 Counter,
169 Gauge,
170 Histogram,
171}
172
173enum Event {
174 Stop,
175 DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
176 RegisterKey(RegisterType, Key, Arc<Handle>),
177 IncrementCounter(Duration, Key, u64),
178 AbsoluteCounter(Duration, Key, u64),
179 UpdateGauge(Duration, Key, GaugeValue),
180 UpdateHistogram(Duration, Key, f64),
181 SetHousekeeping {
182 retention_period: Option<Duration>,
183 housekeeping_period: Option<Duration>,
184 record_limit: Option<usize>,
185 },
186 RequestSummaryFromSignpost {
187 signpost_key: String,
188 keys: Vec<String>,
189 tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
190 },
191}
192
193pub struct SqliteExporterHandle {
195 sender: SyncSender<Event>,
196}
197impl SqliteExporterHandle {
198 pub fn request_average_metrics(
200 &self,
201 from_signpost: &str,
202 with_keys: &[&str],
203 ) -> Result<HashMap<String, f64>> {
204 let (tx, rx) = tokio::sync::oneshot::channel();
205 self.sender
206 .send(Event::RequestSummaryFromSignpost {
207 signpost_key: from_signpost.to_string(),
208 keys: with_keys.iter().map(|s| s.to_string()).collect(),
209 tx,
210 })
211 .map_err(|_| MetricsError::ExporterUnavailable)?;
212 match rx.blocking_recv() {
213 Ok(metrics) => Ok(metrics?),
214 Err(_) => Err(MetricsError::ExporterUnavailable),
215 }
216 }
217}
218
219pub struct SqliteExporter {
221 thread: Option<JoinHandle<()>>,
222 sender: SyncSender<Event>,
223}
224struct InnerState {
225 db: SqliteConnection,
226 last_housekeeping: Instant,
227 housekeeping: Option<Duration>,
228 retention: Option<Duration>,
229 record_limit: Option<usize>,
230 flush_duration: Duration,
231 last_flush: Instant,
232 last_values: HashMap<Key, f64>,
233 counters: HashMap<Key, u64>,
234 key_ids: HashMap<String, i64>,
235 queue: VecDeque<NewMetric>,
236}
237impl InnerState {
238 fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
239 InnerState {
240 db,
241 last_housekeeping: Instant::now(),
242 housekeeping: None,
243 retention: None,
244 record_limit: None,
245 flush_duration,
246 last_flush: Instant::now(),
247 last_values: HashMap::new(),
248 counters: HashMap::new(),
249 key_ids: HashMap::new(),
250 queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
251 }
252 }
253 fn set_housekeeping(
254 &mut self,
255 retention: Option<Duration>,
256 housekeeping_duration: Option<Duration>,
257 record_limit: Option<usize>,
258 ) {
259 self.retention = retention;
260 self.housekeeping = housekeeping_duration;
261 self.last_housekeeping = Instant::now();
262 self.record_limit = record_limit;
263 }
264 fn should_housekeep(&self) -> bool {
265 match self.housekeeping {
266 Some(duration) => self.last_housekeeping.elapsed() > duration,
267 None => false,
268 }
269 }
270 fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
271 SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
272 self.last_housekeeping = Instant::now();
273 Ok(())
274 }
275 fn should_flush(&self) -> bool {
276 if self.last_flush.elapsed() > self.flush_duration {
277 true
278 } else if self.queue.len() >= FLUSH_QUEUE_LIMIT {
279 debug!("Flushing due to queue size ({} items)", self.queue.len());
280 true
281 } else {
282 false
283 }
284 }
285 fn flush(&mut self) -> Result<(), diesel::result::Error> {
286 use crate::schema::metrics::dsl::metrics;
287 if self.queue.is_empty() {
288 self.last_flush = Instant::now();
289 return Ok(());
290 }
291 let drain_buffer: Vec<NewMetric> = self.queue.drain(..).collect();
292 let db = &mut self.db;
293 let transaction_result = db.transaction::<_, diesel::result::Error, _>(|db| {
294 let chunk_size = INSERT_BATCH_SIZE.max(1);
295 for chunk in drain_buffer.chunks(chunk_size) {
296 insert_into(metrics).values(chunk).execute(db)?;
297 }
298 Ok(())
299 });
300 match transaction_result {
301 Ok(()) => {
302 self.last_flush = Instant::now();
303 Ok(())
304 }
305 Err(e) => {
306 self.queue.extend(drain_buffer);
307 Err(e)
308 }
309 }
310 }
311 fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
312 let metric_key_id = match self.key_ids.get(key) {
313 Some(key) => *key,
314 None => {
315 debug!("Looking up {}", key);
316 let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
317 self.key_ids.insert(key.to_string(), key_id);
318 key_id
319 }
320 };
321 let metric = NewMetric {
322 timestamp: timestamp.as_secs_f64(),
323 metric_key_id,
324 value: value as _,
325 };
326 self.queue.push_back(metric);
327 Ok(())
328 }
329
330 pub fn metrics_summary_for_signpost_and_keys(
333 &mut self,
334 signpost: String,
335 metrics: Vec<String>,
336 ) -> Result<HashMap<String, f64>> {
337 query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
338 }
339}
340
341fn run_worker(
342 db: SqliteConnection,
343 receiver: Receiver<Event>,
344 flush_duration: Duration,
345) -> JoinHandle<()> {
346 thread::Builder::new()
347 .name("metrics-sqlite: worker".to_string())
348 .spawn(move || {
349 let mut state = InnerState::new(flush_duration, db);
350 info!("SQLite worker started");
351 loop {
352 let time_based_flush = state.last_flush.elapsed() >= flush_duration;
354
355 let mut should_flush = false;
356 let mut should_exit = false;
357 match receiver.recv_timeout(flush_duration) {
358 Ok(Event::Stop) => {
359 info!("Stopping SQLiteExporter worker, flushing & exiting");
360 should_flush = true;
361 should_exit = true;
362 }
363 Ok(Event::SetHousekeeping {
364 retention_period,
365 housekeeping_period,
366 record_limit,
367 }) => {
368 state.set_housekeeping(retention_period, housekeeping_period, record_limit);
369 }
370 Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
371 info!("Describing key {:?}", key);
372 if let Err(e) = MetricKey::create_or_update(
373 key.as_str(),
374 unit,
375 Some(desc.as_ref()),
376 &mut state.db,
377 ) {
378 error!("Failed to create key entry: {:?}", e);
379 }
380 }
381 Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
382 }
384 Ok(Event::IncrementCounter(timestamp, key, value)) => {
385 let key_name = key.name();
386 let entry = state.counters.entry(key.clone()).or_insert(0);
387 let value = {
388 *entry += value;
389 *entry
390 };
391 if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
392 error!("Error queueing metric: {:?}", e);
393 }
394 should_flush = state.should_flush();
395 }
396 Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
397 let key_name = key.name();
398 state.counters.insert(key.clone(), value);
399 if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
400 error!("Error queueing metric: {:?}", e);
401 }
402 should_flush = state.should_flush();
403 }
404 Ok(Event::UpdateGauge(timestamp, key, value)) => {
405 let key_name = key.name();
406 let entry = state.last_values.entry(key.clone()).or_insert(0.0);
407 let value = match value {
408 GaugeValue::Absolute(v) => {
409 *entry = v;
410 *entry
411 }
412 GaugeValue::Increment(v) => {
413 *entry += v;
414 *entry
415 }
416 GaugeValue::Decrement(v) => {
417 *entry -= v;
418 *entry
419 }
420 };
421 if let Err(e) = state.queue_metric(timestamp, key_name, value) {
422 error!("Error queueing metric: {:?}", e);
423 }
424 should_flush = state.should_flush();
425 }
426 Ok(Event::UpdateHistogram(timestamp, key, value)) => {
427 let key_name = key.name();
428 if let Err(e) = state.queue_metric(timestamp, key_name, value) {
429 error!("Error queueing metric: {:?}", e);
430 }
431 should_flush = state.should_flush();
432 }
433 Ok(Event::RequestSummaryFromSignpost {
434 signpost_key,
435 keys,
436 tx,
437 }) => {
438 match state.flush() {
439 Ok(()) => match state
440 .metrics_summary_for_signpost_and_keys(signpost_key, keys)
441 {
442 Ok(metrics) => {
443 if tx.send(Ok(metrics)).is_err() {
444 error!(
445 "Failed to respond with metrics results, discarding"
446 );
447 }
448 }
449 Err(e) => {
450 if let Err(e) = tx.send(Err(e)) {
451 error!(
452 "Failed to respond with metrics error result, discarding: {e:?}"
453 );
454 }
455 }
456 },
457 Err(e) => {
458 let err = MetricsError::from(e);
459 error!(
460 "Failed to flush pending metrics before summary request: {err:?}"
461 );
462 if let Err(send_err) = tx.send(Err(err)) {
463 error!(
464 "Failed to respond with metrics flush error result, discarding: {send_err:?}"
465 );
466 }
467 }
468 }
469 }
470 Err(RecvTimeoutError::Timeout) => {
471 should_flush = true;
472 }
473 Err(RecvTimeoutError::Disconnected) => {
474 warn!("SQLiteExporter channel disconnected, exiting worker");
475 should_flush = true;
476 should_exit = true;
477 }
478 }
479
480 if time_based_flush || should_flush {
482 if time_based_flush {
483 debug!("Flushing due to elapsed time ({}s)", flush_duration.as_secs());
484 }
485 if let Err(e) = state.flush() {
486 error!("Error flushing metrics: {}", e);
487 }
488 }
489 if state.should_housekeep() {
490 if let Err(e) = state.housekeep() {
491 error!("Failed running house keeping: {:?}", e);
492 }
493 }
494 if should_exit {
495 break;
496 }
497 }
498 })
499 .unwrap()
500}
501
502impl SqliteExporter {
503 pub fn new<P: AsRef<Path>>(
509 flush_interval: Duration,
510 keep_duration: Option<Duration>,
511 path: P,
512 ) -> Result<Self> {
513 let mut db = setup_db_or_reset(path)?;
514 Self::housekeeping(&mut db, keep_duration, None, true);
515 let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
516 let thread = run_worker(db, receiver, flush_interval);
517 let exporter = SqliteExporter {
518 thread: Some(thread),
519 sender,
520 };
521 Ok(exporter)
522 }
523
524 pub fn set_periodic_housekeeping(
529 &self,
530 periodic_duration: Option<Duration>,
531 retention: Option<Duration>,
532 record_limit: Option<usize>,
533 ) {
534 if let Err(e) = self.sender.send(Event::SetHousekeeping {
535 retention_period: retention,
536 housekeeping_period: periodic_duration,
537 record_limit,
538 }) {
539 error!("Failed to set house keeping settings: {:?}", e);
540 }
541 }
542
543 fn housekeeping(
547 db: &mut SqliteConnection,
548 keep_duration: Option<Duration>,
549 record_limit: Option<usize>,
550 vacuum: bool,
551 ) {
552 use crate::schema::metrics::dsl::*;
553 use diesel::dsl::count;
554 if let Some(keep_duration) = keep_duration {
555 match SystemTime::UNIX_EPOCH.elapsed() {
556 Ok(now) => {
557 let cutoff = now - keep_duration;
558 trace!("Deleting data {}s old", keep_duration.as_secs());
559 if let Err(e) =
560 diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
561 .execute(db)
562 {
563 error!("Failed to remove old metrics data: {}", e);
564 }
565 if vacuum {
566 if let Err(e) = sql_query("VACUUM").execute(db) {
567 error!("Failed to vacuum SQLite DB: {:?}", e);
568 }
569 }
570 }
571 Err(e) => {
572 error!(
573 "System time error, skipping metrics-sqlite housekeeping: {}",
574 e
575 );
576 }
577 }
578 }
579 if let Some(record_limit) = record_limit {
580 trace!("Checking for records over {} limit", record_limit);
581 match metrics.select(count(id)).first::<i64>(db) {
582 Ok(records) => {
583 let records = records as usize;
584 if records > record_limit {
585 let excess = records - record_limit + (record_limit / 4); trace!(
587 "Exceeded limit! {} > {}, deleting {} oldest",
588 records, record_limit, excess
589 );
590 let query = format!(
591 "DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});"
592 );
593 if let Err(e) = sql_query(query).execute(db) {
594 error!("Failed to delete excessive records: {:?}", e);
595 }
596 }
597 }
598 Err(e) => {
599 error!("Failed to get record count: {:?}", e);
600 }
601 }
602 }
603 }
604
605 pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
607 let handle = SqliteExporterHandle {
608 sender: self.sender.clone(),
609 };
610 metrics::set_global_recorder(self)?;
611 Ok(handle)
612 }
613}
614impl Drop for SqliteExporter {
615 fn drop(&mut self) {
616 let _ = self.sender.send(Event::Stop);
617 let _ = self.thread.take().unwrap().join();
618 }
619}
620
621#[cfg(test)]
622mod tests {
623 use crate::SqliteExporter;
624 use std::time::{Duration, Instant};
625
626 #[test]
627 fn test_threading() {
628 use std::thread;
629 SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
630 .unwrap()
631 .install()
632 .unwrap();
633 let joins: Vec<thread::JoinHandle<()>> = (0..5)
634 .map(|_| {
635 thread::spawn(move || {
636 let start = Instant::now();
637 loop {
638 metrics::gauge!("rate").set(1.0);
639 metrics::counter!("hits").increment(1);
640 metrics::histogram!("histogram").record(5.0);
641 if start.elapsed().as_secs() >= 5 {
642 break;
643 }
644 }
645 })
646 })
647 .collect();
648 for j in joins {
649 j.join().unwrap();
650 }
651 }
652}