1#![deny(missing_docs)]
2#[macro_use]
5extern crate diesel;
6#[macro_use]
7extern crate diesel_migrations;
8#[macro_use]
9extern crate log;
10
11use diesel::prelude::*;
12use diesel::{insert_into, sql_query};
13
14use metrics::{GaugeValue, Key, KeyName, SetRecorderError, SharedString, Unit};
15
16use diesel_migrations::{EmbeddedMigrations, MigrationHarness};
17use std::sync::Arc;
18use std::{
19 collections::{HashMap, VecDeque},
20 path::Path,
21 sync::mpsc::{Receiver, RecvTimeoutError, SyncSender},
22 thread::{self, JoinHandle},
23 time::{Duration, Instant, SystemTime},
24};
25use thiserror::Error;
26
27const FLUSH_QUEUE_LIMIT: usize = 1000;
29const BACKGROUND_CHANNEL_LIMIT: usize = 8000;
30
31#[derive(Debug, Error)]
33pub enum MetricsError {
34 #[error("Database error: {0}")]
36 DbConnectionError(#[from] ConnectionError),
37 #[error("Migration error: {0}")]
39 MigrationError(Box<dyn std::error::Error + Send + Sync>),
40 #[error("Error querying DB: {0}")]
42 QueryError(#[from] diesel::result::Error),
43 #[error("Invalid database path")]
45 InvalidDatabasePath,
46 #[cfg(feature = "csv")]
48 #[error("IO Error: {0}")]
49 IoError(#[from] std::io::Error),
50 #[cfg(feature = "csv")]
52 #[error("CSV Error: {0}")]
53 CsvError(#[from] csv::Error),
54 #[error("Database has no metrics stored in it")]
56 EmptyDatabase,
57 #[error("Metric key {0} not found in database")]
59 KeyNotFound(String),
60}
61pub type Result<T, E = MetricsError> = std::result::Result<T, E>;
63
64mod metrics_db;
65mod models;
66mod recorder;
67mod schema;
68
69use crate::recorder::Handle;
70pub use metrics_db::{MetricsDb, Session};
71pub use models::{Metric, MetricKey, NewMetric};
72
73pub(crate) const MIGRATIONS: EmbeddedMigrations = embed_migrations!();
74
75fn setup_db<P: AsRef<Path>>(path: P) -> Result<SqliteConnection> {
76 let url = path
77 .as_ref()
78 .to_str()
79 .ok_or(MetricsError::InvalidDatabasePath)?;
80 let mut db = SqliteConnection::establish(url)?;
81 db.run_pending_migrations(MIGRATIONS)
82 .map_err(|e| MetricsError::MigrationError(e))?;
83
84 Ok(db)
85}
86enum RegisterType {
87 Counter,
88 Gauge,
89 Histogram,
90}
91
92enum Event {
93 Stop,
94 DescribeKey(RegisterType, KeyName, Option<Unit>, SharedString),
95 RegisterKey(RegisterType, Key, Arc<Handle>),
96 IncrementCounter(Duration, Key, u64),
97 AbsoluteCounter(Duration, Key, u64),
98 UpdateGauge(Duration, Key, GaugeValue),
99 UpdateHistogram(Duration, Key, f64),
100 SetHousekeeping {
101 retention_period: Option<Duration>,
102 housekeeping_period: Option<Duration>,
103 record_limit: Option<usize>,
104 },
105}
106
107pub struct SqliteExporter {
109 thread: Option<JoinHandle<()>>,
110 sender: SyncSender<Event>,
111}
112struct InnerState {
113 db: SqliteConnection,
114 last_housekeeping: Instant,
115 housekeeping: Option<Duration>,
116 retention: Option<Duration>,
117 record_limit: Option<usize>,
118 flush_duration: Duration,
119 last_flush: Instant,
120 last_values: HashMap<Key, f64>,
121 counters: HashMap<Key, u64>,
122 key_ids: HashMap<String, i64>,
123 queue: VecDeque<NewMetric>,
124}
125impl InnerState {
126 fn new(flush_duration: Duration, db: SqliteConnection) -> Self {
127 InnerState {
128 db,
129 last_housekeeping: Instant::now(),
130 housekeeping: None,
131 retention: None,
132 record_limit: None,
133 flush_duration,
134 last_flush: Instant::now(),
135 last_values: HashMap::new(),
136 counters: HashMap::new(),
137 key_ids: HashMap::new(),
138 queue: VecDeque::with_capacity(FLUSH_QUEUE_LIMIT),
139 }
140 }
141 fn set_housekeeping(
142 &mut self,
143 retention: Option<Duration>,
144 housekeeping_duration: Option<Duration>,
145 record_limit: Option<usize>,
146 ) {
147 self.retention = retention;
148 self.housekeeping = housekeeping_duration;
149 self.last_housekeeping = Instant::now();
150 self.record_limit = record_limit;
151 }
152 fn should_housekeep(&self) -> bool {
153 match self.housekeeping {
154 Some(duration) => self.last_housekeeping.elapsed() > duration,
155 None => false,
156 }
157 }
158 fn housekeep(&mut self) -> Result<(), diesel::result::Error> {
159 SqliteExporter::housekeeping(&mut self.db, self.retention, self.record_limit, false);
160 self.last_housekeeping = Instant::now();
161 Ok(())
162 }
163 fn should_flush(&self) -> bool {
164 if self.last_flush.elapsed() > self.flush_duration {
165 debug!("Flushing due to {}s timeout", self.flush_duration.as_secs());
166 true
167 } else {
168 self.queue.len() >= FLUSH_QUEUE_LIMIT
169 }
170 }
171 fn flush(&mut self) -> Result<(), diesel::result::Error> {
172 use crate::schema::metrics::dsl::metrics;
173 let db = &mut self.db;
175 let queue = self.queue.drain(..);
176 db.transaction::<_, diesel::result::Error, _>(|db| {
177 for rec in queue {
178 insert_into(metrics).values(&rec).execute(db)?;
179 }
180 Ok(())
181 })?;
182 self.last_flush = Instant::now();
183 Ok(())
184 }
185 fn queue_metric(&mut self, timestamp: Duration, key: &str, value: f64) -> Result<()> {
186 let metric_key_id = match self.key_ids.get(key) {
187 Some(key) => *key,
188 None => {
189 debug!("Looking up {}", key);
190 let key_id = MetricKey::key_by_name(key, &mut self.db)?.id;
191 self.key_ids.insert(key.to_string(), key_id);
192 key_id
193 }
194 };
195 let metric = NewMetric {
196 timestamp: timestamp.as_secs_f64(),
197 metric_key_id,
198 value: value as _,
199 };
200 self.queue.push_back(metric);
201 Ok(())
202 }
203}
204
205fn run_worker(
206 db: SqliteConnection,
207 receiver: Receiver<Event>,
208 flush_duration: Duration,
209) -> JoinHandle<()> {
210 thread::Builder::new()
211 .name("metrics-sqlite: worker".to_string())
212 .spawn(move || {
213 let mut state = InnerState::new(flush_duration, db);
214 info!("SQLite worker started");
215 loop {
216 let (should_flush, should_exit) = match receiver.recv_timeout(flush_duration) {
217 Ok(Event::Stop) => {
218 info!("Stopping SQLiteExporter worker, flushing & exiting");
219 (true, true)
220 }
221 Ok(Event::SetHousekeeping {
222 retention_period,
223 housekeeping_period,
224 record_limit,
225 }) => {
226 state.set_housekeeping(retention_period, housekeeping_period, record_limit);
227 (false, false)
228 }
229 Ok(Event::DescribeKey(_key_type, key, unit, desc)) => {
230 info!("Describing key {:?}", key);
231 if let Err(e) = MetricKey::create_or_update(
232 key.as_str(),
233 unit,
234 Some(desc.as_ref()),
235 &mut state.db,
236 ) {
237 error!("Failed to create key entry: {:?}", e);
238 }
239 (false, false)
240 }
241 Ok(Event::RegisterKey(_key_type, _key, _handle)) => {
242 (false, false)
244 }
245 Ok(Event::IncrementCounter(timestamp, key, value)) => {
246 let key_str = key.name().to_string();
247 let entry = state.counters.entry(key).or_insert(0);
248 let value = {
249 *entry += value;
250 *entry
251 };
252 if let Err(e) = state.queue_metric(timestamp, &key_str, value as _) {
253 error!("Error queueing metric: {:?}", e);
254 }
255
256 (state.should_flush(), false)
257 }
258 Ok(Event::AbsoluteCounter(timestamp, key, value)) => {
259 let key_str = key.name().to_string();
260 state.counters.insert(key, value);
261 if let Err(e) = state.queue_metric(timestamp, &key_str, value as _) {
262 error!("Error queueing metric: {:?}", e);
263 }
264 (state.should_flush(), false)
265 }
266 Ok(Event::UpdateGauge(timestamp, key, value)) => {
267 let key_str = key.name().to_string();
268 let entry = state.last_values.entry(key).or_insert(0.0);
269 let value = match value {
270 GaugeValue::Absolute(v) => {
271 *entry = v;
272 *entry
273 }
274 GaugeValue::Increment(v) => {
275 *entry += v;
276 *entry
277 }
278 GaugeValue::Decrement(v) => {
279 *entry -= v;
280 *entry
281 }
282 };
283 if let Err(e) = state.queue_metric(timestamp, &key_str, value) {
284 error!("Error queueing metric: {:?}", e);
285 }
286 (state.should_flush(), false)
287 }
288 Ok(Event::UpdateHistogram(timestamp, key, value)) => {
289 let key_str = key.name().to_string();
290 if let Err(e) = state.queue_metric(timestamp, &key_str, value) {
291 error!("Error queueing metric: {:?}", e);
292 }
293
294 (state.should_flush(), false)
295 }
296 Err(RecvTimeoutError::Timeout) => {
297 debug!("Flushing due to {}s timeout", flush_duration.as_secs());
298 (true, false)
299 }
300 Err(RecvTimeoutError::Disconnected) => {
301 warn!("SQLiteExporter channel disconnected, exiting worker");
302 (true, true)
303 }
304 };
305 if should_flush {
306 if let Err(e) = state.flush() {
307 error!("Error flushing metrics: {}", e);
308 }
309 }
310 if state.should_housekeep() {
311 if let Err(e) = state.housekeep() {
312 error!("Failed running house keeping: {:?}", e);
313 }
314 }
315 if should_exit {
316 break;
317 }
318 }
319 })
320 .unwrap()
321}
322
323impl SqliteExporter {
324 pub fn new<P: AsRef<Path>>(
330 flush_interval: Duration,
331 keep_duration: Option<Duration>,
332 path: P,
333 ) -> Result<Self> {
334 let mut db = setup_db(path)?;
335 Self::housekeeping(&mut db, keep_duration, None, true);
336 let (sender, receiver) = std::sync::mpsc::sync_channel(BACKGROUND_CHANNEL_LIMIT);
337 let thread = run_worker(db, receiver, flush_interval);
338 let exporter = SqliteExporter {
339 thread: Some(thread),
340 sender,
341 };
342 Ok(exporter)
343 }
344
345 pub fn set_periodic_housekeeping(
350 &self,
351 periodic_duration: Option<Duration>,
352 retention: Option<Duration>,
353 record_limit: Option<usize>,
354 ) {
355 if let Err(e) = self.sender.send(Event::SetHousekeeping {
356 retention_period: retention,
357 housekeeping_period: periodic_duration,
358 record_limit,
359 }) {
360 error!("Failed to set house keeping settings: {:?}", e);
361 }
362 }
363
364 fn housekeeping(
368 db: &mut SqliteConnection,
369 keep_duration: Option<Duration>,
370 record_limit: Option<usize>,
371 vacuum: bool,
372 ) {
373 use crate::schema::metrics::dsl::*;
374 use diesel::dsl::count;
375 if let Some(keep_duration) = keep_duration {
376 match SystemTime::UNIX_EPOCH.elapsed() {
377 Ok(now) => {
378 let cutoff = now - keep_duration;
379 trace!("Deleting data {}s old", keep_duration.as_secs());
380 if let Err(e) =
381 diesel::delete(metrics.filter(timestamp.le(cutoff.as_secs_f64())))
382 .execute(db)
383 {
384 error!("Failed to remove old metrics data: {}", e);
385 }
386 if vacuum {
387 if let Err(e) = sql_query("VACUUM").execute(db) {
388 error!("Failed to vacuum SQLite DB: {:?}", e);
389 }
390 }
391 }
392 Err(e) => {
393 error!(
394 "System time error, skipping metrics-sqlite housekeeping: {}",
395 e
396 );
397 }
398 }
399 }
400 if let Some(record_limit) = record_limit {
401 trace!("Checking for records over {} limit", record_limit);
402 match metrics.select(count(id)).first::<i64>(db) {
403 Ok(records) => {
404 let records = records as usize;
405 if records > record_limit {
406 let excess = records - record_limit + (record_limit / 4); trace!(
408 "Exceeded limit! {} > {}, deleting {} oldest",
409 records,
410 record_limit,
411 excess
412 );
413 let query = format!("DELETE FROM metrics WHERE id IN (SELECT id FROM metrics ORDER BY timestamp ASC LIMIT {});", excess);
414 if let Err(e) = sql_query(query).execute(db) {
415 error!("Failed to delete excessive records: {:?}", e);
416 }
417 }
418 }
419 Err(e) => {
420 error!("Failed to get record count: {:?}", e);
421 }
422 }
423 }
424 }
425
426 pub fn install(self) -> Result<(), SetRecorderError> {
428 metrics::set_boxed_recorder(Box::new(self))
429 }
430}
431impl Drop for SqliteExporter {
432 fn drop(&mut self) {
433 let _ = self.sender.send(Event::Stop);
434 let _ = self.thread.take().unwrap().join();
435 }
436}
437
438#[cfg(test)]
439mod tests {
440 use crate::SqliteExporter;
441 use std::time::{Duration, Instant};
442
443 #[test]
444 fn test_threading() {
445 use std::thread;
446 SqliteExporter::new(Duration::from_millis(500), None, "metrics.db")
447 .unwrap()
448 .install()
449 .unwrap();
450 let joins: Vec<thread::JoinHandle<()>> = (0..5)
451 .map(|_| {
452 thread::spawn(move || {
453 let start = Instant::now();
454 loop {
455 metrics::gauge!("rate", 1.0);
456 metrics::increment_counter!("hits");
457 metrics::histogram!("histogram", 5.0);
458 if start.elapsed().as_secs() >= 5 {
459 break;
460 }
461 }
462 })
463 })
464 .collect();
465 for j in joins {
466 j.join().unwrap();
467 }
468 }
469}