#![deny(missing_docs)]
#[macro_use]
extern crate diesel;
#[macro_use]
extern crate diesel_migrations;
use tracing::{debug, error, info, trace, warn};
use diesel::prelude::*;
use diesel::{insert_into, sql_query};
use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
use std::sync::Arc;
use std::{
collections::{HashMap, VecDeque},
path::{Path, PathBuf},
sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
thread::{self, JoinHandle},
time::{Duration, Instant, SystemTime},
};
use thiserror::Error;
const FLUSH_QUEUE_LIMIT: usize = 1000;
const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
const SQLITE_DEFAULT_MAX_VARIABLES: usize = 999;
const METRIC_FIELDS_PER_ROW: usize = 3;
const INSERT_BATCH_SIZE: usize = SQLITE_DEFAULT_MAX_VARIABLES / METRIC_FIELDS_PER_ROW;
#[derive(Debug, Error)]
pub enum MetricsError {
#[error("Database error: {0}")]
DbConnectionError(#[from] ConnectionError),
#[error("Migration error: {0}")]
MigrationError(Box<dyn std::error::Error + Send + Sync>),
#[error("Error querying DB: {0}")]
QueryError(#[from] diesel::result::Error),
#[error("Invalid database path")]
InvalidDatabasePath,
#[cfg(feature = "csv")]
#[error("IO Error: {0}")]
IoError(#[from] std::io::Error),
#[cfg(feature = "csv")]
#[error("CSV Error: {0}")]
CsvError(#[from] csv::Error),
#[error("Database has no metrics stored in it")]
EmptyDatabase,
#[error("Metric key {0} not found in database")]
KeyNotFound(String),
#[error("Exporter task has been stopped or crashed")]
ExporterUnavailable,
#[error("Session for signpost `{0}` has zero duration")]
ZeroLengthSession(String),
#[error("No metrics recorded for `{0}` in requested session")]
NoMetricsForKey(String),
}
impl MetricsError {
fn is_malformed_db(&self) -> bool {
self.to_string().contains("malformed")
}
}
pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
mod metrics_db;
mod models;
mod recorder;
mod schema;
use crate::metrics_db::query;
use crate::recorder::Handle;
pub use metrics_db::{MetricsDb, Session};
pub use models::{Metric, MetricKey, NewMetric};
pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
#[derive(QueryableByName)]
struct PragmaCheckResult {
#[diesel(sql_type = diesel::sql_types::Text)]
#[diesel(column_name = quick_check)]
result: String,
}
fn remove_db_files(path: &Path) {
let db_path = PathBuf::from(path);
for suffix in &["", "-wal", "-shm"] {
let mut file_path = db_path.clone().into_os_string();
file_path.push(suffix);
let file_path = PathBuf::from(file_path);
if file_path.exists() {
if let Err(e) = std::fs::remove_file(&file_path) {
error!("Failed to remove {}: {}", file_path.display(), e);
} else {
info!("Removed corrupt database file: {}", file_path.display());
}
}
}
}
fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
let url = path
.as_ref()
.to_str()
.ok_or(MetricsError::InvalidDatabasePath)?;
let mut db = SqliteConnection::establish(url)?;
sql_query("PRAGMA journal_mode=WAL;").execute(&mut db)?;
sql_query("PRAGMA busy_timeout = 5000;").execute(&mut db)?;
db.run_pending_migrations(MIGRATIONS)
.map_err(MetricsError::MigrationError)?;
let check: String = sql_query("PRAGMA quick_check;")
.get_result::<PragmaCheckResult>(&mut db)?
.result;
if check != "ok" {
return Err(MetricsError::QueryError(
diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::Unknown,
Box::new(format!("database disk image is malformed: {check}")),
),
));
}
Ok(db)
}
fn setup_db_or_reset<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
let path = path.as_ref();
match setup_db(path) {
Ok(db) => Ok(db),
Err(err) if err.is_malformed_db() => {
warn!(
"Database is malformed, removing and recreating: {}",
path.display()
);
remove_db_files(path);
setup_db(path)
}
Err(err) => Err(err),
}
}
enum RegisterType {
Counter,
Gauge,
Histogram,
}
enum Event {
Stop,
DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
RegisterKey(RegisterType, Key, Arc<Handle>),
IncrementCounter(Duration, Key, u64),
AbsoluteCounter(Duration, Key, u64),
UpdateGauge(Duration, Key, GaugeValue),
UpdateHistogram(Duration, Key, f64),
SetHousekeeping {
retention_period: Option<Duration>,
housekeeping_period: Option<Duration>,
record_limit: Option<usize>,
},
RequestSummaryFromSignpost {
signpost_key: String,
keys: Vec<String>,
tx: tokio::sync::oneshot::Sender<Result<HashMap<String, f64>>>,
},
}
pub struct SqliteExporterHandle {
sender: SyncSender<Event>,
}
impl SqliteExporterHandle {
pub fn request_average_metrics(
&self,
from_signpost: &str,
with_keys: &[&str],
) -> Result<HashMap<String, f64>> {
let (tx, rx) = tokio::sync::oneshot::channel();
self.sender
.send(Event::RequestSummaryFromSignpost {
signpost_key: from_signpost.to_string(),
keys: with_keys.iter().map(|s| s.to_string()).collect(),
tx,
})
.map_err(|_| MetricsError::ExporterUnavailable)?;
match rx.blocking_recv() {
Ok(metrics) => Ok(metrics?),
Err(_) => Err(MetricsError::ExporterUnavailable),
}
}
}
pub struct SqliteExporter {
thread: Option<JoinHandle<()>>,
sender: SyncSender<Event>,
}
struct InnerState {
db: SqliteConnection,
last_housekeeping: Instant,
housekeeping: Option<Duration>,
retention: Option<Duration>,
record_limit: Option<usize>,
flush_duration: Duration,
last_flush: Instant,
last_values: HashMap<Key, f64>,
counters: HashMap<Key, u64>,
key_ids: HashMap<String, i64>,
queue: VecDeque<NewMetric>,
}
impl InnerState {
fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
InnerState {
db,
last_housekeeping: Instant::now(),
housekeeping: None,
retention: None,
record_limit: None,
flush_duration,
last_flush: Instant::now(),
last_values: HashMap::new(),
counters: HashMap::new(),
key_ids: HashMap::new(),
queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
}
}
fn set_housekeeping(
&mut self,
retention: Option<Duration>,
housekeeping_duration: Option<Duration>,
record_limit: Option<usize>,
) {
self.retention = retention;
self.housekeeping = housekeeping_duration;
self.last_housekeeping = Instant::now();
self.record_limit = record_limit;
}
fn should_housekeep(&self) -> bool {
match self.housekeeping {
Some(duration) => self.last_housekeeping.elapsed() > duration,
None => false,
}
}
fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
self.last_housekeeping = Instant::now();
Ok(())
}
fn should_flush(&self) -> bool {
if self.last_flush.elapsed() > self.flush_duration {
true
} else if self.queue.len() >= FLUSH_QUEUE_LIMIT {
debug!("Flushing due to queue size ({} items)", self.queue.len());
true
} else {
false
}
}
fn flush(&mut self) -> Result<(), diesel::result::Error> {
use crate::schema::metrics::dsl::metrics;
if self.queue.is_empty() {
self.last_flush = Instant::now();
return Ok(());
}
let drain_buffer: Vec<NewMetric> = self.queue.drain(..).collect();
let db = &mut self.db;
let transaction_result = db.transaction::<_, diesel::result::Error, _>(|db| {
let chunk_size = INSERT_BATCH_SIZE.max(1);
for chunk in drain_buffer.chunks(chunk_size) {
insert_into(metrics).values(chunk).execute(db)?;
}
Ok(())
});
match transaction_result {
Ok(()) => {
self.last_flush = Instant::now();
Ok(())
}
Err(e) => {
self.queue.extend(drain_buffer);
Err(e)
}
}
}
fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
let metric_key_id = match self.key_ids.get(key) {
Some(key) => *key,
None => {
debug!("Looking up {}", key);
let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
self.key_ids.insert(key.to_string(), key_id);
key_id
}
};
let metric = NewMetric {
timestamp: timestamp.as_secs_f64(),
metric_key_id,
value: value as _,
};
self.queue.push_back(metric);
Ok(())
}
pub fn metrics_summary_for_signpost_and_keys(
&mut self,
signpost: String,
metrics: Vec<String>,
) -> Result<HashMap<String, f64>> {
query::metrics_summary_for_signpost_and_keys(&mut self.db, &signpost, metrics)
}
}
fn run_worker(
db: SqliteConnection,
receiver: Receiver<Event>,
flush_duration: Duration,
) -> JoinHandle<()> {
thread::Builder::new()
.name("metrics-sqlite: worker".to_string())
.spawn(move || {
let mut state = InnerState::new(flush_duration, db);
info!("SQLite worker started");
loop {
let time_based_flush = state.last_flush.elapsed() >= flush_duration;
let mut should_flush = false;
let mut should_exit = false;
match receiver.recv_timeout(flush_duration) {
Ok(Event::Stop) => {
info!("Stopping SQLiteExporter worker, flushing & exiting");
should_flush = true;
should_exit = true;
}
Ok(Event::SetHousekeeping {
retention_period,
housekeeping_period,
record_limit,
}) => {
state.set_housekeeping(retention_period, housekeeping_period, record_limit);
}
Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
info!("Describing key {:?}", key);
if let Err(e) = MetricKey::create_or_update(
key.as_str(),
unit,
Some(desc.as_ref()),
&mut state.db,
) {
error!("Failed to create key entry: {:?}", e);
}
}
Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
}
Ok(Event::IncrementCounter(timestamp, key, value)) => {
let key_name = key.name();
let entry = state.counters.entry(key.clone()).or_insert(0);
let value = {
*entry += value;
*entry
};
if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
error!("Error queueing metric: {:?}", e);
}
should_flush = state.should_flush();
}
Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
let key_name = key.name();
state.counters.insert(key.clone(), value);
if let Err(e) = state.queue_metric(timestamp, key_name, value as _) {
error!("Error queueing metric: {:?}", e);
}
should_flush = state.should_flush();
}
Ok(Event::UpdateGauge(timestamp, key, value)) => {
let key_name = key.name();
let entry = state.last_values.entry(key.clone()).or_insert(0.0);
let value = match value {
GaugeValue::Absolute(v) => {
*entry = v;
*entry
}
GaugeValue::Increment(v) => {
*entry += v;
*entry
}
GaugeValue::Decrement(v) => {
*entry -= v;
*entry
}
};
if let Err(e) = state.queue_metric(timestamp, key_name, value) {
error!("Error queueing metric: {:?}", e);
}
should_flush = state.should_flush();
}
Ok(Event::UpdateHistogram(timestamp, key, value)) => {
let key_name = key.name();
if let Err(e) = state.queue_metric(timestamp, key_name, value) {
error!("Error queueing metric: {:?}", e);
}
should_flush = state.should_flush();
}
Ok(Event::RequestSummaryFromSignpost {
signpost_key,
keys,
tx,
}) => {
match state.flush() {
Ok(()) => match state
.metrics_summary_for_signpost_and_keys(signpost_key, keys)
{
Ok(metrics) => {
if tx.send(Ok(metrics)).is_err() {
error!(
"Failed to respond with metrics results, discarding"
);
}
}
Err(e) => {
if let Err(e) = tx.send(Err(e)) {
error!(
"Failed to respond with metrics error result, discarding: {e:?}"
);
}
}
},
Err(e) => {
let err = MetricsError::from(e);
error!(
"Failed to flush pending metrics before summary request: {err:?}"
);
if let Err(send_err) = tx.send(Err(err)) {
error!(
"Failed to respond with metrics flush error result, discarding: {send_err:?}"
);
}
}
}
}
Err(RecvTimeoutError::Timeout) => {
should_flush = true;
}
Err(RecvTimeoutError::Disconnected) => {
warn!("SQLiteExporter channel disconnected, exiting worker");
should_flush = true;
should_exit = true;
}
}
if time_based_flush || should_flush {
if time_based_flush {
debug!("Flushing due to elapsed time ({}s)", flush_duration.as_secs());
}
if let Err(e) = state.flush() {
error!("Error flushing metrics: {}", e);
}
}
if state.should_housekeep() {
if let Err(e) = state.housekeep() {
error!("Failed running house keeping: {:?}", e);
}
}
if should_exit {
break;
}
}
})
.unwrap()
}
impl SqliteExporter {
pub fn new<P: AsRef<Path>>(
flush_interval: Duration,
keep_duration: Option<Duration>,
path: P,
) -> Result<Self> {
let mut db = setup_db_or_reset(path)?;
Self::housekeeping(&mut db, keep_duration, None, true);
let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
let thread = run_worker(db, receiver, flush_interval);
let exporter = SqliteExporter {
thread: Some(thread),
sender,
};
Ok(exporter)
}
pub fn set_periodic_housekeeping(
&self,
periodic_duration: Option<Duration>,
retention: Option<Duration>,
record_limit: Option<usize>,
) {
if let Err(e) = self.sender.send(Event::SetHousekeeping {
retention_period: retention,
housekeeping_period: periodic_duration,
record_limit,
}) {
error!("Failed to set house keeping settings: {:?}", e);
}
}
fn housekeeping(
db: &mut SqliteConnection,
keep_duration: Option<Duration>,
record_limit: Option<usize>,
vacuum: bool,
) {
use crate::schema::metrics::dsl::*;
use diesel::dsl::count;
if let Some(keep_duration) = keep_duration {
match SystemTime::UNIX_EPOCH.elapsed() {
Ok(now) => {
let cutoff = now - keep_duration;
trace!("Deleting data {}s old", keep_duration.as_secs());
if let Err(e) =
diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
.execute(db)
{
error!("Failed to remove old metrics data: {}", e);
}
if vacuum {
if let Err(e) = sql_query("VACUUM").execute(db) {
error!("Failed to vacuum SQLite DB: {:?}", e);
}
}
}
Err(e) => {
error!(
"System time error, skipping metrics-sqlite housekeeping: {}",
e
);
}
}
}
if let Some(record_limit) = record_limit {
trace!("Checking for records over {} limit", record_limit);
match metrics.select(count(id)).first::<i64>(db) {
Ok(records) => {
let records = records as usize;
if records > record_limit {
let excess = records - record_limit + (record_limit / 4); trace!(
"Exceeded limit! {} > {}, deleting {} oldest",
records, record_limit, excess
);
let query = format!(
"DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {excess});"
);
if let Err(e) = sql_query(query).execute(db) {
error!("Failed to delete excessive records: {:?}", e);
}
}
}
Err(e) => {
error!("Failed to get record count: {:?}", e);
}
}
}
}
pub fn install(self) -> Result<SqliteExporterHandle, SetRecorderError<Self>> {
let handle = SqliteExporterHandle {
sender: self.sender.clone(),
};
metrics::set_global_recorder(self)?;
Ok(handle)
}
}
impl Drop for SqliteExporter {
fn drop(&mut self) {
let _ = self.sender.send(Event::Stop);
let _ = self.thread.take().unwrap().join();
}
}
#[cfg(test)]
mod tests {
use crate::SqliteExporter;
use std::time::{Duration, Instant};
#[test]
fn test_threading() {
use std::thread;
SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
.unwrap()
.install()
.unwrap();
let joins: Vec<thread::JoinHandle<()>> = (0..5)
.map(|_| {
thread::spawn(move || {
let start = Instant::now();
loop {
metrics::gauge!("rate").set(1.0);
metrics::counter!("hits").increment(1);
metrics::histogram!("histogram").record(5.0);
if start.elapsed().as_secs() >= 5 {
break;
}
}
})
})
.collect();
for j in joins {
j.join().unwrap();
}
}
}