use std::cell::Cell;
use std::collections::HashMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::thread;
use std::time::{Duration, SystemTime};
use super::recv::Collector;
use super::metric::{CollectedMetric, Id};
mod aggregate;
use self::aggregate::AggregatedMetric;
type Timeseries = (SystemTime, i32);
pub struct DbOptions {
pub aggregation_interval: Option<Duration>,
}
impl Default for DbOptions {
fn default() -> DbOptions {
DbOptions {
aggregation_interval: None,
}
}
}
pub struct Db {
collection_sender: Mutex<Sender<Vec<CollectedMetric>>>,
collection_receiver: Mutex<Receiver<Vec<CollectedMetric>>>,
collected_metrics: Mutex<Cell<Vec<CollectedMetric>>>,
aggregation_interval: Duration,
aggregation_subscribers: Mutex<Cell<Vec<Sender<Arc<Vec<AggregatedMetric>>>>>>,
aggregated_metrics: Option<Mutex<Cell<HashMap<AggregatedKey, Vec<Timeseries>>>>>,
}
impl Db {
pub fn new(options: DbOptions) -> Db {
let aggregation_interval = options.aggregation_interval.unwrap_or_else(|| Duration::new(10, 0));
let (send, recv) = channel();
Db {
collection_sender: Mutex::new(send),
collection_receiver: Mutex::new(recv),
collected_metrics: Mutex::new(Cell::new(vec![])),
aggregation_interval,
aggregation_subscribers: Mutex::new(Cell::new(vec![])),
aggregated_metrics: Some(Mutex::new(Cell::new(HashMap::new()))),
}
}
pub fn collector(&self) -> Collector {
let sender = {
self.collection_sender.lock().unwrap().clone()
};
Collector::new(sender)
}
pub fn sync_recv(&self) {
let receiver = self.collection_receiver.lock().unwrap();
for metrics in receiver.iter() {
self.collect(metrics)
}
}
pub fn sync_aggregate(&self) {
loop {
self.aggregate();
thread::sleep(self.aggregation_interval);
}
}
pub fn collect(&self, metrics: Vec<CollectedMetric>) {
let mut cell = self.collected_metrics.lock().unwrap();
cell.get_mut().extend(metrics);
}
pub fn aggregate(&self) {
let collected_metrics = {
let cell = self.collected_metrics.lock().unwrap();
cell.replace(Vec::new())
};
let grouped = aggregate::group(collected_metrics);
let aggregated = aggregate::aggregate(grouped);
if let Some(ref mutex) = self.aggregated_metrics {
let mut cell = mutex.lock().unwrap();
let aggregated_metrics = cell.get_mut();
for metric in &aggregated {
let (key, timeseries) = metric.into();
let values = aggregated_metrics.entry(key).or_insert_with(|| vec![]);
values.push(timeseries)
}
}
let mut cell = self.aggregation_subscribers.lock().unwrap();
let subscribers = cell.get_mut();
let ptr = Arc::new(aggregated);
for subscriber in subscribers {
let _ = subscriber.send(ptr.clone());
}
}
pub fn aggregation_subscribe(&self) -> Receiver<Arc<Vec<AggregatedMetric>>> {
let (send, recv) = channel();
let mut cell = self.aggregation_subscribers.lock().unwrap();
let subscribers = cell.get_mut();
subscribers.push(send);
recv
}
}
impl fmt::Debug for Db {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Db")
.finish()
}
}
#[derive(Eq, Hash, PartialEq)]
enum AggregatedKey {
Count(Id),
Gauge(Id),
}
impl<'a> Into<(AggregatedKey, (SystemTime, i32))> for &'a AggregatedMetric {
fn into(self) -> (AggregatedKey, (SystemTime, i32)) {
use self::AggregatedMetric::*;
match self {
&Count(time, ref id, value) => (AggregatedKey::Count(id.to_owned()), (time, value)),
&Gauge(time, ref id, value) => (AggregatedKey::Gauge(id.to_owned()), (time, value)),
}
}
}