metrics_datadog_exporter/
exporter.rs1use flate2::write::GzEncoder;
4use flate2::Compression;
5use futures::future::try_join_all;
6use std::io::Write;
7use std::sync::Arc;
8use std::time::Duration;
9
10use itertools::Itertools;
11use metrics::{Key, Label};
12use metrics_util::registry::{AtomicStorage, Registry};
13use reqwest::header::CONTENT_ENCODING;
14use reqwest::{blocking, Client};
15use tokio::spawn;
16use tokio::task::JoinHandle;
17use tokio_schedule::{every, Job};
18use tracing::Level;
19use tracing::{debug, enabled, warn};
20
21use crate::builder::DataDogConfig;
22use crate::data::{DataDogApiPost, DataDogMetric, DataDogSeries};
23use crate::{Error, Result};
24
25const MAX_PAYLOAD_BYTES: usize = 3200000;
27const MAX_DECOMPRESSED_PAYLOAD: usize = 62914560;
28
29fn send_blocking(
30 metrics: Vec<DataDogMetric>,
31 gzip: bool,
32 api_host: String,
33 api_key: String,
34 client: blocking::Client,
35) -> Result<(), Error> {
36 if !metrics.is_empty() {
37 let requests = metric_requests(metrics, gzip)?;
38 for request in requests {
39 let mut request = client
40 .post(format!("{}/series", api_host.to_owned()))
41 .header("DD-API-KEY", api_key.to_owned())
42 .body(request);
43 if gzip {
44 request = request.header(CONTENT_ENCODING, "gzip");
45 }
46
47 let response = request.send()?.error_for_status()?;
48 if enabled!(Level::DEBUG) {
49 let status = response.status();
50 let message = response.text()?;
51 debug!(status = %status, message = %message, "Response from DataDog API")
52 }
53 }
54 };
55 Ok(())
56}
57
58async fn send_async(
59 metrics: Vec<DataDogMetric>,
60 gzip: bool,
61 api_host: &String,
62 api_key: &String,
63 client: &Client,
64) -> Result<(), Error> {
65 if !metrics.is_empty() {
66 let requests = metric_requests(metrics, gzip)?;
67 let responses = try_join_all(requests.into_iter().map(|request| async {
68 let mut request = client
69 .post(format!("{}/series", api_host.to_owned()))
70 .header("DD-API-KEY", api_key.to_owned())
71 .body(request);
72 if gzip {
73 request = request.header(CONTENT_ENCODING, "gzip");
74 }
75 let response = request.send().await?.error_for_status()?;
76 let status = response.status();
77 let message = response.text().await?;
78 Ok::<_, reqwest::Error>((status, message))
79 }))
80 .await?;
81
82 if enabled!(Level::DEBUG) {
83 responses.into_iter().for_each(|(status, message)| {
84 debug!(status = %status, message = %message, "Response from DataDog API")
85 });
86 }
87 };
88 Ok(())
89}
90
91fn metric_requests(metrics: Vec<DataDogMetric>, gzip: bool) -> Result<Vec<Vec<u8>>> {
92 let series = metrics
93 .into_iter()
94 .flat_map(DataDogSeries::new)
95 .collect_vec();
96 if gzip {
97 split_and_compress_series(&series)
98 } else {
99 split_series(&series)
100 }
101}
102
103fn split_series(series: &[DataDogSeries]) -> Result<Vec<Vec<u8>>> {
104 let body = serde_json::to_vec(&DataDogApiPost { series })?;
105 if body.len() < MAX_PAYLOAD_BYTES {
106 Ok(vec![body])
107 } else {
108 let (left, right) = series.split_at(series.len() / 2);
109 Ok(split_series(left)?
110 .into_iter()
111 .chain(split_series(right)?.into_iter())
112 .collect_vec())
113 }
114}
115
116fn split_and_compress_series(series: &[DataDogSeries]) -> Result<Vec<Vec<u8>>> {
117 fn split(series: &[DataDogSeries]) -> Result<Vec<Vec<u8>>> {
118 let (left, right) = series.split_at(series.len() / 2);
119 Ok(split_and_compress_series(left)?
120 .into_iter()
121 .chain(split_and_compress_series(right)?.into_iter())
122 .collect_vec())
123 }
124
125 let body = serde_json::to_vec(&DataDogApiPost { series })?;
126 if body.len() > MAX_DECOMPRESSED_PAYLOAD {
127 split(series)
128 } else {
129 let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
130 encoder.write_all(&serde_json::to_vec(&DataDogApiPost { series })?)?;
131 let compressed = encoder.finish()?;
132 if compressed.len() < MAX_PAYLOAD_BYTES {
133 Ok(vec![compressed])
134 } else {
135 split(series)
136 }
137 }
138}
139
140pub struct DataDogExporter {
142 registry: Arc<Registry<Key, AtomicStorage>>,
143 write_to_stdout: bool,
144 write_to_api: bool,
145 api_host: String,
146 api_client: Option<Client>,
147 api_key: Option<String>,
148 tags: Vec<Label>,
149 gzip: bool,
150}
151
152impl DataDogExporter {
153 pub(crate) fn new(
154 registry: Arc<Registry<Key, AtomicStorage>>,
155 client: Option<Client>,
156 config: DataDogConfig,
157 ) -> Self {
158 DataDogExporter {
159 registry,
160 write_to_stdout: config.write_to_stdout,
161 write_to_api: config.write_to_api,
162 api_host: config.api_host,
163 api_client: client,
164 api_key: config.api_key,
165 tags: config.tags,
166 gzip: config.gzip,
167 }
168 }
169
170 pub fn schedule(self, interval: Duration) -> (Arc<Self>, JoinHandle<()>) {
172 let exporter = Arc::new(self);
173 let scheduled_exporter = exporter.clone();
174 let every = every(interval.as_secs() as u32).seconds().perform(move || {
175 let exporter = scheduled_exporter.clone();
176 async move {
177 let result = exporter.clone().flush().await;
178 if let Err(e) = result {
179 warn!(error = ?e, "Failed to flush metrics");
180 }
181 }
182 });
183 (exporter, spawn(every))
184 }
185
186 pub fn collect(&self) -> Vec<DataDogMetric> {
190 let counters = self
191 .registry
192 .get_counter_handles()
193 .into_iter()
194 .group_by(|(k, _)| k.clone())
195 .into_iter()
196 .map(|(key, values)| {
197 DataDogMetric::from_counter(
198 key,
199 values.into_iter().map(|(_, v)| v).collect_vec(),
200 &self.tags,
201 )
202 })
203 .collect_vec();
204
205 let gauges = self
206 .registry
207 .get_gauge_handles()
208 .into_iter()
209 .group_by(|(k, _)| k.clone())
210 .into_iter()
211 .map(|(key, values)| {
212 DataDogMetric::from_gauge(
213 key,
214 values.into_iter().map(|(_, v)| v).collect_vec(),
215 &self.tags,
216 )
217 })
218 .collect_vec();
219
220 let histograms = self
221 .registry
222 .get_histogram_handles()
223 .into_iter()
224 .group_by(|(k, _)| k.clone())
225 .into_iter()
226 .map(|(key, values)| {
227 DataDogMetric::from_histogram(
228 key,
229 values.into_iter().map(|(_, v)| v).collect_vec(),
230 &self.tags,
231 )
232 })
233 .collect_vec();
234
235 self.registry.clear();
236
237 counters
238 .into_iter()
239 .chain(gauges.into_iter())
240 .chain(histograms.into_iter())
241 .collect_vec()
242 }
243
244 pub async fn flush(&self) -> Result<()> {
246 let metrics: Vec<DataDogMetric> = self.collect();
247 debug!("Flushing {} metrics", metrics.len());
248
249 if self.write_to_stdout {
250 self.write_to_stdout(metrics.as_slice())?;
251 }
252
253 if self.write_to_api {
254 self.write_to_api(metrics).await?;
255 }
256
257 Ok(())
258 }
259
260 fn write_to_stdout(&self, metrics: &[DataDogMetric]) -> Result<()> {
261 for metric in metrics {
262 for m in metric.to_metric_lines() {
263 println!("{}", serde_json::to_string(&m)?)
264 }
265 }
266 Ok(())
267 }
268
269 async fn write_to_api(&self, metrics: Vec<DataDogMetric>) -> Result<(), Error> {
270 send_async(
271 metrics,
272 self.gzip,
273 &self.api_host,
274 self.api_key.as_ref().unwrap(),
275 self.api_client.as_ref().unwrap(),
276 )
277 .await
278 }
279}
280
281impl Drop for DataDogExporter {
282 fn drop(&mut self) {
283 let metrics = self.collect();
284 if self.write_to_stdout {
285 if let Err(e) = self.write_to_stdout(metrics.as_slice()) {
286 eprintln!("Failed to flush to stdout: {}", e)
287 };
288 }
289
290 if self.write_to_api {
291 let host = self.api_host.to_string();
292 let api_key = self.api_key.as_ref().unwrap().to_string();
293 let compression = self.gzip;
294 let joined = std::thread::spawn(move || {
296 send_blocking(
297 metrics,
298 compression,
299 host,
300 api_key,
301 blocking::Client::default(),
302 )
303 })
304 .join();
305
306 match joined {
307 Ok(r) => match r {
308 Ok(_) => (),
309 Err(e) => {
310 eprintln!("Failed to flush metrics in drop: {}", e);
311 }
312 },
313 Err(_) => {
314 eprintln!("Failed to join flush thread in drop");
315 }
316 };
317 }
318 }
319}