1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use crate::data::{DataDogApiPost, DataDogMetric, DataDogSeries};
use crate::{Error, Result};
use log::error;
use metrics::{Key, Label};
use metrics_util::{Handle, NotTracked, Registry};
use reqwest::{Client, Response};
use std::sync::Arc;
use std::time::Duration;
use tokio::spawn;
use tokio::task::JoinHandle;
use tokio_schedule::{every, Job};

/// Metric exporter
pub struct DataDogExporter {
    registry: Arc<Registry<Key, Handle, NotTracked<Handle>>>,
    write_to_stdout: bool,
    write_to_api: bool,
    api_host: String,
    api_client: Option<Client>,
    api_key: Option<String>,
    tags: Vec<Label>,
}

impl DataDogExporter {
    pub(crate) fn new(
        registry: Arc<Registry<Key, Handle, NotTracked<Handle>>>,
        write_to_stdout: bool,
        write_to_api: bool,
        api_host: String,
        api_client: Option<Client>,
        api_key: Option<String>,
        tags: Vec<Label>,
    ) -> Self {
        DataDogExporter {
            registry,
            write_to_stdout,
            write_to_api,
            api_host,
            api_client,
            api_key,
            tags,
        }
    }

    /// Write metrics every [`Duration`]
    pub fn schedule(&'static self, interval: Duration) -> JoinHandle<()> {
        let every = every(interval.as_secs() as u32)
            .seconds()
            .perform(move || async move {
                let result = self.flush().await;
                if let Err(e) = result {
                    error!("Failed to flush metrics: {}", e);
                }
            });
        spawn(every)
    }

    /// Collect metrics
    ///
    /// Note: This will clear histogram observations    
    pub fn collect(&self) -> Vec<DataDogMetric> {
        self.registry
            .get_handles()
            .iter()
            .map(|((kind, key), (_, handle))| {
                DataDogMetric::from_metric(kind, key, handle, &self.tags)
            })
            .collect()
    }

    /// Flush metrics
    pub async fn flush(&self) -> Result<()> {
        let metrics: Vec<DataDogMetric> = self.collect();
        log::debug!("Flushing {} metrics", metrics.len());

        if self.write_to_stdout {
            for metric in &metrics {
                self.write_to_stdout(metric)?;
            }
        }

        if self.write_to_api {
            self.write_to_api(&metrics).await?;
        }

        Ok(())
    }

    fn write_to_stdout(&self, metric: &DataDogMetric) -> Result<()> {
        for m in metric.to_metric_lines() {
            println!("{}", serde_json::to_string(&m)?)
        }
        Ok(())
    }

    async fn write_to_api(&self, metrics: &[DataDogMetric]) -> Result<Response, Error> {
        let body = &DataDogApiPost {
            series: metrics
                .iter()
                .map(|m| m.into())
                .collect::<Vec<DataDogSeries>>(),
        };
        self.api_client
            .as_ref()
            .unwrap()
            .post(format!("{}/series", self.api_host))
            .header("DD-API-KEY", self.api_key.as_ref().unwrap())
            .json(&body)
            .send()
            .await?
            .error_for_status()
            .map_err(crate::Error::from)
    }
}