1#![deny(missing_docs)]
2#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8use tracing::{debug, error, info, trace, warn};
9
10use diesel::prelude::*;
11use diesel::{insert_into, sql_query};
12
13use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
14
15use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
16use std::sync::Arc;
17use std::{
18 collections::{HashMap, VecDeque},
19 path::Path,
20 sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
21 thread::{self, JoinHandle},
22 time::{Duration, Instant, SystemTime},
23};
24use thiserror::Error;
25
26const FLUSH_QUEUE_LIMIT: usize = 1000;
28const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
29const SQLITE_DEFAULT_MAX_VARIABLES: usize = 999;
30const METRIC_FIELDS_PER_ROW: usize = 3;
31const INSERT_BATCH_SIZE: usize = SQLITE_DEFAULT_MAX_VARIABLES / METRIC_FIELDS_PER_ROW;
32
33#[derive(Debug, Error)]
35pub enum MetricsError {
36 #[error("Database error: {0}")]
38 DbConnectionError(#[from] ConnectionError),
39 #[error("Migration error: {0}")]
41 MigrationError(Box<dyn std::error::Error + Send + Sync>),
42 #[error("Error querying DB: {0}")]
44 QueryError(#[from] diesel::result::Error),
45 #[error("Invalid database path")]
47 InvalidDatabasePath,
48 #[cfg(feature = "csv")]
50 #[error("IO Error: {0}")]
51 IoError(#[from] std::io::Error),
52 #[cfg(feature = "csv")]
54 #[error("CSV Error: {0}")]
55 CsvError(#[from] csv::Error),
56 #[error("Database has no metrics stored in it")]
58 EmptyDatabase,
59 #[error("Metric key {0} not found in database")]
61 KeyNotFound(String),
62 #[error("Exporter task has been stopped or crashed")]
64 ExporterUnavailable,
65 #[error("Session for signpost `{0}` has zero duration")]
67 ZeroLengthSession(String),
68 #[error("No metrics recorded for `{0}` in requested session")]
70 NoMetricsForKey(String),
71}
72pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
74
75mod metrics_db;
76mod models;
77mod recorder;
78mod schema;
79
80use crate::metrics_db::query;
81use crate::recorder::Handle;
82pub use metrics_db::{MetricsDb, Session};
83pub use models::{Metric, MetricKey, NewMetric};
84
85pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
86
87fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
88 let url = path
89 .as_ref()
90 .to_str()
91 .ok_or(MetricsError::InvalidDatabasePath)?;
92 let mut db = SqliteConnection::establish(url)?;
93
94 sql_query("PRAGMA journal_mode=WAL;").execute(&mut db)?;
96
97 sql_query("PRAGMA busy_timeout = 5000;").execute(&mut db)?;
99
100 db.run_pending_migrations(MIGRATIONS)
101 .map_err(MetricsError::MigrationError)?;
102
103 Ok(db)
104}
105enum RegisterType {
106 Counter,
107 Gauge,
108 Histogram,
109}
110
111enum Event {
112 Stop,
113 DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
114 RegisterKey(RegisterType, Key, Arc<Handle>),
115 IncrementCounter(Duration, Key, u64),
116 AbsoluteCounter(Duration, Key, u64),
117 UpdateGauge(Duration, Key, GaugeValue),
118 UpdateHistogram(Duration, Key, f64),
119 SetHousekeeping {
120 retention_period: Option<Duration>,
121 housekeeping_period: Option<Duration>,
122 record_limit: Option<usize>,
123 },
124 RequestSummaryFromSignpost {
125 signpost_key: String,
126 keys: Vec<String>,
127 tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
128 },
129}
130
131pub struct SqliteExporterHandle {
133 sender: SyncSender<Event>,
134}
135impl SqliteExporterHandle {
136 pub fn request_average_metrics(
138 &self,
139 from_signpost: &str,
140 with_keys: &[&str],
141 ) -> Result<HashMap<String, f64>> {
142 let (tx, rx) = tokio::sync::oneshot::channel();
143 self.sender
144 .send(Event::RequestSummaryFromSignpost {
145 signpost_key: from_signpost.to_string(),
146 keys: with_keys.iter().map(|s| s.to_string()).collect(),
147 tx,
148 })
149 .map_err(|_| MetricsError::ExporterUnavailable)?;
150 match rx.blocking_recv() {
151 Ok(metrics) => Ok(metrics?),
152 Err(_) => Err(MetricsError::ExporterUnavailable),
153 }
154 }
155}
156
157pub struct SqliteExporter {
159 thread: Option<JoinHandle<()>>,
160 sender: SyncSender<Event>,
161}
162struct InnerState {
163 db: SqliteConnection,
164 last_housekeeping: Instant,
165 housekeeping: Option<Duration>,
166 retention: Option<Duration>,
167 record_limit: Option<usize>,
168 flush_duration: Duration,
169 last_flush: Instant,
170 last_values: HashMap<Key, f64>,
171 counters: HashMap<Key, u64>,
172 key_ids: HashMap<String, i64>,
173 queue: VecDeque<NewMetric>,
174}
175impl InnerState {
176 fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
177 InnerState {
178 db,
179 last_housekeeping: Instant::now(),
180 housekeeping: None,
181 retention: None,
182 record_limit: None,
183 flush_duration,
184 last_flush: Instant::now(),
185 last_values: HashMap::new(),
186 counters: HashMap::new(),
187 key_ids: HashMap::new(),
188 queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
189 }
190 }
191 fn set_housekeeping(
192 &mut self,
193 retention: Option<Duration>,
194 housekeeping_duration: Option<Duration>,
195 record_limit: Option<usize>,
196 ) {
197 self.retention = retention;
198 self.housekeeping = housekeeping_duration;
199 self.last_housekeeping = Instant::now();
200 self.record_limit = record_limit;
201 }
202 fn should_housekeep(&self) -> bool {
203 match self.housekeeping {
204 Some(duration) => self.last_housekeeping.elapsed() > duration,
205 None => false,
206 }
207 }
208 fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
209 SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
210 self.last_housekeeping = Instant::now();
211 Ok(())
212 }
213 fn should_flush(&self) -> bool {
214 if self.last_flush.elapsed() > self.flush_duration {
215 true
216 } else if self.queue.len() >= FLUSH_QUEUE_LIMIT {
217 debug!("Flushing due to queue size ({} items)", self.queue.len());
218 true
219 } else {
220 false
221 }
222 }
223 fn flush(&mut self) -> Result<(), diesel::result::Error> {
224 use crate::schema::metrics::dsl::metrics;
225 if self.queue.is_empty() {
226 self.last_flush = Instant::now();
227 return Ok(());
228 }
229 let drain_buffer: Vec<NewMetric> = self.queue.drain(..).collect();
230 let db = &mut self.db;
231 let transaction_result = db.transaction::<_, diesel::result::Error, _>(|db| {
232 let chunk_size = INSERT_BATCH_SIZE.max(1);
233 for chunk in drain_buffer.chunks(chunk_size) {
234 insert_into(metrics).values(chunk).execute(db)?;
235 }
236 Ok(())
237 });
238 match transaction_result {
239 Ok(()) => {
240 self.last_flush = Instant::now();
241 Ok(())
242 }
243 Err(e) => {
244 self.queue.extend(drain_buffer);
245 Err(e)
246 }
247 }
248 }
249 fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
250 let metric_key_id = match self.key_ids.get(key) {
251 Some(key) => *key,
252 None => {
253 debug!("Looking up {}", key);
254 let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
255 self.key_ids.insert(key.to_string(), key_id);
256 key_id
257 }
258 };
259 let metric = NewMetric {
260 timestamp: timestamp.as_secs_f64(),
261 metric_key_id,
262 value: value as _,
263 };
264 self.queue.push_back(metric);
265 Ok(())
266 }
267
268 pub fn metrics_summary_for_signpost_and_keys(
271 &mut self,
272 signpost: String,
273 metrics: Vec<String>,
274 ) -> Result<HashMap<String, f64>> {
275 query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
276 }
277}
278
279fn run_worker(
280 db: SqliteConnection,
281 receiver: Receiver<Event>,
282 flush_duration: Duration,
283) -> JoinHandle<()> {
284 thread::Builder::new()
285 .name("metrics-sqlite: worker".to_string())
286 .spawn(move || {
287 let mut state = InnerState::new(flush_duration, db);
288 info!("SQLite worker started");
289 loop {
290 let time_based_flush = state.last_flush.elapsed() >= flush_duration;
292
293 let (should_flush, should_exit) = match receiver.recv_timeout(flush_duration) {
294 Ok(Event::Stop) => {
295 info!("Stopping SQLiteExporter worker, flushing & exiting");
296 (true, true)
297 }
298 Ok(Event::SetHousekeeping {
299 retention_period,
300 housekeeping_period,
301 record_limit,
302 }) => {
303 state.set_housekeeping(retention_period, housekeeping_period, record_limit);
304 (false, false)
305 }
306 Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
307 info!("Describing key {:?}", key);
308 if let Err(e) = MetricKey::create_or_update(
309 key.as_str(),
310 unit,
311 Some(desc.as_ref()),
312 &mut state.db,
313 ) {
314 error!("Failed to create key entry: {:?}", e);
315 }
316 (false, false)
317 }
318 Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
319 (false, false)
321 }
322 Ok(Event::IncrementCounter(timestamp, key, value)) => {
323 let key_name = key.name();
324 let entry = state.counters.entry(key.clone()).or_insert(0);
325 let value = {
326 *entry += value;
327 *entry
328 };
329 if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
330 error!("Error queueing metric: {:?}", e);
331 }
332
333 (state.should_flush(), false)
334 }
335 Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
336 let key_name = key.name();
337 state.counters.insert(key.clone(), value);
338 if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
339 error!("Error queueing metric: {:?}", e);
340 }
341 (state.should_flush(), false)
342 }
343 Ok(Event::UpdateGauge(timestamp, key, value)) => {
344 let key_name = key.name();
345 let entry = state.last_values.entry(key.clone()).or_insert(0.0);
346 let value = match value {
347 GaugeValue::Absolute(v) => {
348 *entry = v;
349 *entry
350 }
351 GaugeValue::Increment(v) => {
352 *entry += v;
353 *entry
354 }
355 GaugeValue::Decrement(v) => {
356 *entry -= v;
357 *entry
358 }
359 };
360 if let Err(e) = state.queue_metric(timestamp, key_name, value) {
361 error!("Error queueing metric: {:?}", e);
362 }
363 (state.should_flush(), false)
364 }
365 Ok(Event::UpdateHistogram(timestamp, key, value)) => {
366 let key_name = key.name();
367 if let Err(e) = state.queue_metric(timestamp, key_name, value) {
368 error!("Error queueing metric: {:?}", e);
369 }
370
371 (state.should_flush(), false)
372 }
373 Ok(Event::RequestSummaryFromSignpost {
374 signpost_key,
375 keys,
376 tx,
377 }) => {
378 match state.flush() {
379 Ok(()) => match state
380 .metrics_summary_for_signpost_and_keys(signpost_key, keys)
381 {
382 Ok(metrics) => {
383 if tx.send(Ok(metrics)).is_err() {
384 error!(
385 "Failed to respond with metrics results, discarding"
386 );
387 }
388 }
389 Err(e) => {
390 if let Err(e) = tx.send(Err(e)) {
391 error!(
392 "Failed to respond with metrics error result, discarding: {e:?}"
393 );
394 }
395 }
396 },
397 Err(e) => {
398 let err = MetricsError::from(e);
399 error!(
400 "Failed to flush pending metrics before summary request: {err:?}"
401 );
402 if let Err(send_err) = tx.send(Err(err)) {
403 error!(
404 "Failed to respond with metrics flush error result, discarding: {send_err:?}"
405 );
406 }
407 }
408 }
409 (false, false)
410 }
411 Err(RecvTimeoutError::Timeout) => {
412 (true, false)
413 }
414 Err(RecvTimeoutError::Disconnected) => {
415 warn!("SQLiteExporter channel disconnected, exiting worker");
416 (true, true)
417 }
418 };
419
420 if time_based_flush || should_flush {
422 if time_based_flush {
423 debug!("Flushing due to elapsed time ({}s)", flush_duration.as_secs());
424 }
425 if let Err(e) = state.flush() {
426 error!("Error flushing metrics: {}", e);
427 }
428 }
429 if state.should_housekeep() {
430 if let Err(e) = state.housekeep() {
431 error!("Failed running house keeping: {:?}", e);
432 }
433 }
434 if should_exit {
435 break;
436 }
437 }
438 })
439 .unwrap()
440}
441
442impl SqliteExporter {
443 pub fn new<P: AsRef<Path>>(
449 flush_interval: Duration,
450 keep_duration: Option<Duration>,
451 path: P,
452 ) -> Result<Self> {
453 let mut db = setup_db(path)?;
454 Self::housekeeping(&mut db, keep_duration, None, true);
455 let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
456 let thread = run_worker(db, receiver, flush_interval);
457 let exporter = SqliteExporter {
458 thread: Some(thread),
459 sender,
460 };
461 Ok(exporter)
462 }
463
464 pub fn set_periodic_housekeeping(
469 &self,
470 periodic_duration: Option<Duration>,
471 retention: Option<Duration>,
472 record_limit: Option<usize>,
473 ) {
474 if let Err(e) = self.sender.send(Event::SetHousekeeping {
475 retention_period: retention,
476 housekeeping_period: periodic_duration,
477 record_limit,
478 }) {
479 error!("Failed to set house keeping settings: {:?}", e);
480 }
481 }
482
483 fn housekeeping(
487 db: &mut SqliteConnection,
488 keep_duration: Option<Duration>,
489 record_limit: Option<usize>,
490 vacuum: bool,
491 ) {
492 use crate::schema::metrics::dsl::*;
493 use diesel::dsl::count;
494 if let Some(keep_duration) = keep_duration {
495 match SystemTime::UNIX_EPOCH.elapsed() {
496 Ok(now) => {
497 let cutoff = now - keep_duration;
498 trace!("Deleting data {}s old", keep_duration.as_secs());
499 if let Err(e) =
500 diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
501 .execute(db)
502 {
503 error!("Failed to remove old metrics data: {}", e);
504 }
505 if vacuum {
506 if let Err(e) = sql_query("VACUUM").execute(db) {
507 error!("Failed to vacuum SQLite DB: {:?}", e);
508 }
509 }
510 }
511 Err(e) => {
512 error!(
513 "System time error, skipping metrics-sqlite housekeeping: {}",
514 e
515 );
516 }
517 }
518 }
519 if let Some(record_limit) = record_limit {
520 trace!("Checking for records over {} limit", record_limit);
521 match metrics.select(count(id)).first::<i64>(db) {
522 Ok(records) => {
523 let records = records as usize;
524 if records > record_limit {
525 let excess = records - record_limit + (record_limit / 4); trace!(
527 "Exceeded limit! {} > {}, deleting {} oldest",
528 records,
529 record_limit,
530 excess
531 );
532 let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});");
533 if let Err(e) = sql_query(query).execute(db) {
534 error!("Failed to delete excessive records: {:?}", e);
535 }
536 }
537 }
538 Err(e) => {
539 error!("Failed to get record count: {:?}", e);
540 }
541 }
542 }
543 }
544
545 pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
547 let handle = SqliteExporterHandle {
548 sender: self.sender.clone(),
549 };
550 metrics::set_global_recorder(self)?;
551 Ok(handle)
552 }
553}
554impl Drop for SqliteExporter {
555 fn drop(&mut self) {
556 let _ = self.sender.send(Event::Stop);
557 let _ = self.thread.take().unwrap().join();
558 }
559}
560
561#[cfg(test)]
562mod tests {
563 use crate::SqliteExporter;
564 use std::time::{Duration, Instant};
565
566 #[test]
567 fn test_threading() {
568 use std::thread;
569 SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
570 .unwrap()
571 .install()
572 .unwrap();
573 let joins: Vec<thread::JoinHandle<()>> = (0..5)
574 .map(|_| {
575 thread::spawn(move || {
576 let start = Instant::now();
577 loop {
578 metrics::gauge!("rate").set(1.0);
579 metrics::counter!("hits").increment(1);
580 metrics::histogram!("histogram").record(5.0);
581 if start.elapsed().as_secs() >= 5 {
582 break;
583 }
584 }
585 })
586 })
587 .collect();
588 for j in joins {
589 j.join().unwrap();
590 }
591 }
592}