metrics_datadog_exporter/
exporter.rs

1//! DataDog HTTP API exporter
2
3use flate2::write::GzEncoder;
4use flate2::Compression;
5use futures::future::try_join_all;
6use std::io::Write;
7use std::sync::Arc;
8use std::time::Duration;
9
10use itertools::Itertools;
11use metrics::{Key, Label};
12use metrics_util::registry::{AtomicStorage, Registry};
13use reqwest::header::CONTENT_ENCODING;
14use reqwest::{blocking, Client};
15use tokio::spawn;
16use tokio::task::JoinHandle;
17use tokio_schedule::{every, Job};
18use tracing::Level;
19use tracing::{debug, enabled, warn};
20
21use crate::builder::DataDogConfig;
22use crate::data::{DataDogApiPost, DataDogMetric, DataDogSeries};
23use crate::{Error, Result};
24
25// Size constants from https://docs.datadoghq.com/api/latest/metrics/#submit-metrics
26const MAX_PAYLOAD_BYTES: usize = 3200000;
27const MAX_DECOMPRESSED_PAYLOAD: usize = 62914560;
28
29fn send_blocking(
30    metrics: Vec<DataDogMetric>,
31    gzip: bool,
32    api_host: String,
33    api_key: String,
34    client: blocking::Client,
35) -> Result<(), Error> {
36    if !metrics.is_empty() {
37        let requests = metric_requests(metrics, gzip)?;
38        for request in requests {
39            let mut request = client
40                .post(format!("{}/series", api_host.to_owned()))
41                .header("DD-API-KEY", api_key.to_owned())
42                .body(request);
43            if gzip {
44                request = request.header(CONTENT_ENCODING, "gzip");
45            }
46
47            let response = request.send()?.error_for_status()?;
48            if enabled!(Level::DEBUG) {
49                let status = response.status();
50                let message = response.text()?;
51                debug!(status = %status, message = %message, "Response from DataDog API")
52            }
53        }
54    };
55    Ok(())
56}
57
58async fn send_async(
59    metrics: Vec<DataDogMetric>,
60    gzip: bool,
61    api_host: &String,
62    api_key: &String,
63    client: &Client,
64) -> Result<(), Error> {
65    if !metrics.is_empty() {
66        let requests = metric_requests(metrics, gzip)?;
67        let responses = try_join_all(requests.into_iter().map(|request| async {
68            let mut request = client
69                .post(format!("{}/series", api_host.to_owned()))
70                .header("DD-API-KEY", api_key.to_owned())
71                .body(request);
72            if gzip {
73                request = request.header(CONTENT_ENCODING, "gzip");
74            }
75            let response = request.send().await?.error_for_status()?;
76            let status = response.status();
77            let message = response.text().await?;
78            Ok::<_, reqwest::Error>((status, message))
79        }))
80        .await?;
81
82        if enabled!(Level::DEBUG) {
83            responses.into_iter().for_each(|(status, message)| {
84                debug!(status = %status, message = %message, "Response from DataDog API")
85            });
86        }
87    };
88    Ok(())
89}
90
91fn metric_requests(metrics: Vec<DataDogMetric>, gzip: bool) -> Result<Vec<Vec<u8>>> {
92    let series = metrics
93        .into_iter()
94        .flat_map(DataDogSeries::new)
95        .collect_vec();
96    if gzip {
97        split_and_compress_series(&series)
98    } else {
99        split_series(&series)
100    }
101}
102
103fn split_series(series: &[DataDogSeries]) -> Result<Vec<Vec<u8>>> {
104    let body = serde_json::to_vec(&DataDogApiPost { series })?;
105    if body.len() < MAX_PAYLOAD_BYTES {
106        Ok(vec![body])
107    } else {
108        let (left, right) = series.split_at(series.len() / 2);
109        Ok(split_series(left)?
110            .into_iter()
111            .chain(split_series(right)?.into_iter())
112            .collect_vec())
113    }
114}
115
116fn split_and_compress_series(series: &[DataDogSeries]) -> Result<Vec<Vec<u8>>> {
117    fn split(series: &[DataDogSeries]) -> Result<Vec<Vec<u8>>> {
118        let (left, right) = series.split_at(series.len() / 2);
119        Ok(split_and_compress_series(left)?
120            .into_iter()
121            .chain(split_and_compress_series(right)?.into_iter())
122            .collect_vec())
123    }
124
125    let body = serde_json::to_vec(&DataDogApiPost { series })?;
126    if body.len() > MAX_DECOMPRESSED_PAYLOAD {
127        split(series)
128    } else {
129        let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
130        encoder.write_all(&serde_json::to_vec(&DataDogApiPost { series })?)?;
131        let compressed = encoder.finish()?;
132        if compressed.len() < MAX_PAYLOAD_BYTES {
133            Ok(vec![compressed])
134        } else {
135            split(series)
136        }
137    }
138}
139
140/// Metric exporter
141pub struct DataDogExporter {
142    registry: Arc<Registry<Key, AtomicStorage>>,
143    write_to_stdout: bool,
144    write_to_api: bool,
145    api_host: String,
146    api_client: Option<Client>,
147    api_key: Option<String>,
148    tags: Vec<Label>,
149    gzip: bool,
150}
151
152impl DataDogExporter {
153    pub(crate) fn new(
154        registry: Arc<Registry<Key, AtomicStorage>>,
155        client: Option<Client>,
156        config: DataDogConfig,
157    ) -> Self {
158        DataDogExporter {
159            registry,
160            write_to_stdout: config.write_to_stdout,
161            write_to_api: config.write_to_api,
162            api_host: config.api_host,
163            api_client: client,
164            api_key: config.api_key,
165            tags: config.tags,
166            gzip: config.gzip,
167        }
168    }
169
170    /// Write metrics every [`Duration`]
171    pub fn schedule(self, interval: Duration) -> (Arc<Self>, JoinHandle<()>) {
172        let exporter = Arc::new(self);
173        let scheduled_exporter = exporter.clone();
174        let every = every(interval.as_secs() as u32).seconds().perform(move || {
175            let exporter = scheduled_exporter.clone();
176            async move {
177                let result = exporter.clone().flush().await;
178                if let Err(e) = result {
179                    warn!(error = ?e, "Failed to flush metrics");
180                }
181            }
182        });
183        (exporter, spawn(every))
184    }
185
186    /// Collect metrics
187    ///
188    /// Note: This will clear histogram observations    
189    pub fn collect(&self) -> Vec<DataDogMetric> {
190        let counters = self
191            .registry
192            .get_counter_handles()
193            .into_iter()
194            .group_by(|(k, _)| k.clone())
195            .into_iter()
196            .map(|(key, values)| {
197                DataDogMetric::from_counter(
198                    key,
199                    values.into_iter().map(|(_, v)| v).collect_vec(),
200                    &self.tags,
201                )
202            })
203            .collect_vec();
204
205        let gauges = self
206            .registry
207            .get_gauge_handles()
208            .into_iter()
209            .group_by(|(k, _)| k.clone())
210            .into_iter()
211            .map(|(key, values)| {
212                DataDogMetric::from_gauge(
213                    key,
214                    values.into_iter().map(|(_, v)| v).collect_vec(),
215                    &self.tags,
216                )
217            })
218            .collect_vec();
219
220        let histograms = self
221            .registry
222            .get_histogram_handles()
223            .into_iter()
224            .group_by(|(k, _)| k.clone())
225            .into_iter()
226            .map(|(key, values)| {
227                DataDogMetric::from_histogram(
228                    key,
229                    values.into_iter().map(|(_, v)| v).collect_vec(),
230                    &self.tags,
231                )
232            })
233            .collect_vec();
234
235        self.registry.clear();
236
237        counters
238            .into_iter()
239            .chain(gauges.into_iter())
240            .chain(histograms.into_iter())
241            .collect_vec()
242    }
243
244    /// Flush metrics
245    pub async fn flush(&self) -> Result<()> {
246        let metrics: Vec<DataDogMetric> = self.collect();
247        debug!("Flushing {} metrics", metrics.len());
248
249        if self.write_to_stdout {
250            self.write_to_stdout(metrics.as_slice())?;
251        }
252
253        if self.write_to_api {
254            self.write_to_api(metrics).await?;
255        }
256
257        Ok(())
258    }
259
260    fn write_to_stdout(&self, metrics: &[DataDogMetric]) -> Result<()> {
261        for metric in metrics {
262            for m in metric.to_metric_lines() {
263                println!("{}", serde_json::to_string(&m)?)
264            }
265        }
266        Ok(())
267    }
268
269    async fn write_to_api(&self, metrics: Vec<DataDogMetric>) -> Result<(), Error> {
270        send_async(
271            metrics,
272            self.gzip,
273            &self.api_host,
274            self.api_key.as_ref().unwrap(),
275            self.api_client.as_ref().unwrap(),
276        )
277        .await
278    }
279}
280
281impl Drop for DataDogExporter {
282    fn drop(&mut self) {
283        let metrics = self.collect();
284        if self.write_to_stdout {
285            if let Err(e) = self.write_to_stdout(metrics.as_slice()) {
286                eprintln!("Failed to flush to stdout: {}", e)
287            };
288        }
289
290        if self.write_to_api {
291            let host = self.api_host.to_string();
292            let api_key = self.api_key.as_ref().unwrap().to_string();
293            let compression = self.gzip;
294            // reqwest::blocking can't run in existing runtime
295            let joined = std::thread::spawn(move || {
296                send_blocking(
297                    metrics,
298                    compression,
299                    host,
300                    api_key,
301                    blocking::Client::default(),
302                )
303            })
304            .join();
305
306            match joined {
307                Ok(r) => match r {
308                    Ok(_) => (),
309                    Err(e) => {
310                        eprintln!("Failed to flush metrics in drop: {}", e);
311                    }
312                },
313                Err(_) => {
314                    eprintln!("Failed to join flush thread in drop");
315                }
316            };
317        }
318    }
319}