1use async_recursion::async_recursion;
2use async_trait::async_trait;
3use chrono::Utc;
4use log_tracing_layer::{Log, LogEvent, LogIngestor};
5
6#[cfg(not(feature = "tls"))]
7use reqwest_default as reqwest;
8
9#[cfg(feature = "tls")]
10use reqwest_tls as reqwest;
11
12use serde_json::json;
13use std::{collections::VecDeque, error::Error, io::Write, sync::Arc, time::Duration};
14use tokio::sync::RwLock;
15
16const DD_SOURCE: &str = "dd-tracing-layer";
17const MAX_BATCH_SIZE: usize = 1000;
18const MAX_BATCH_DURATION_SECS: i64 = 5;
19const MAX_RETRIES: u8 = 3;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum Region {
23 US1,
24 US3,
25 US5,
26 US1FED,
27 EU,
28 AP1,
29 AP2,
30}
31
32#[derive(Debug, Default)]
33pub struct DatadogOptions {
34 pub api_key: String,
35 pub service_name: String,
36 pub region: Option<Region>,
37 pub url: Option<String>,
38 pub tags: Option<String>,
39}
40
41impl DatadogOptions {
42 pub fn new(service_name: impl Into<String>, api_key: impl Into<String>) -> Self {
43 Self {
44 service_name: service_name.into(),
45 api_key: api_key.into(),
46 ..Default::default()
47 }
48 }
49
50 #[must_use]
51 pub const fn with_region(mut self, region: Region) -> Self {
52 self.region = Some(region);
53 self
54 }
55
56 #[must_use]
57 pub fn with_tags(mut self, tags: impl Into<String>) -> Self {
58 self.tags = Some(tags.into());
59 self
60 }
61
62 #[must_use]
63 pub fn with_url(mut self, url: impl Into<String>) -> Self {
64 self.url = Some(url.into());
65 self
66 }
67}
68
69#[derive(Debug, Default)]
70pub struct DatadogLogIngestor {
71 url: String,
72 service_name: String,
73 api_key: String,
74 tags: String,
75 client: reqwest::Client,
76 queue: Arc<RwLock<VecDeque<LogEvent>>>,
77}
78
79impl DatadogLogIngestor {
80 pub fn new(options: DatadogOptions) -> Self {
81 let url = options.url.unwrap_or_else(|| {
83 match options.region {
84 Some(Region::US1) | None => "https://http-intake.logs.datadoghq.com/api/v2/logs",
85 Some(Region::US3) => "https://http-intake.logs.us3.datadoghq.com/api/v2/logs",
86 Some(Region::US5) => "https://http-intake.logs.us5.datadoghq.com/api/v2/logs",
87 Some(Region::US1FED) => "https://http-intake.logs.ddog-gov.com/api/v2/logs",
88 Some(Region::EU) => "https://http-intake.logs.datadoghq.eu/api/v2/logs",
89 Some(Region::AP1) => "https://http-intake.logs.ap1.datadoghq.com/api/v2/logs",
90 Some(Region::AP2) => "https://http-intake.logs.ap2.datadoghq.com/api/v2/logs",
91 }
92 .to_string()
93 });
94
95 let source_tags = &format!("source-version:{}", env!("CARGO_PKG_VERSION"));
96
97 let tags = options
98 .tags
99 .map_or_else(|| source_tags.into(), |t| format!("{t}, {source_tags}"));
100
101 Self {
102 url,
103 service_name: options.service_name,
104 api_key: options.api_key,
105 tags,
106 client: reqwest::Client::new(),
107 queue: Arc::new(RwLock::new(VecDeque::new())),
108 }
109 }
110
111 fn compress(logs: &[Log]) -> Result<Vec<u8>, Box<dyn Error>> {
112 let bytes = serde_json::to_vec(&logs)?;
113 let mut encoder = libflate::gzip::Encoder::new(Vec::new())?;
114 encoder.write_all(&bytes)?;
115 let result = encoder.finish().into_result()?;
116 Ok(result)
117 }
118
119 #[async_recursion]
120 async fn send_logs(&self, logs: &[Log], retries: u8) {
121 if retries > MAX_RETRIES {
122 eprintln!("Failed to send logs after {retries} retries");
123 return;
124 }
125
126 let retry = || async {
127 let next = retries + 1;
128 let next_time = 100 * u64::from(next);
129 tokio::time::sleep(Duration::from_millis(next_time)).await;
130 self.send_logs(logs, next).await;
131 };
132
133 let compressed_logs = match Self::compress(logs) {
135 Ok(logs) => logs,
136 Err(e) => {
137 eprintln!("Failed to compress logs: {e:?}");
138 return;
139 }
140 };
141
142 match self
144 .client
145 .post(&self.url)
146 .header("User-Agent", "dd-tracing-subscriber/0.1.0")
147 .header("DD-API-KEY", &self.api_key)
148 .header("Content-Type", "application/json")
149 .header("Content-Encoding", "gzip")
150 .body(compressed_logs)
151 .send()
152 .await
153 {
154 Ok(res) => match res.status().as_u16() {
155 202 => {
156 }
158 400 => {
159 eprintln!("Bad request (likely an issue in the payload formatting)");
160 }
161 401 => {
162 eprintln!("Unauthorized (likely a missing API Key)");
163 }
164 403 => {
165 eprintln!("Permission issue (likely using an invalid API Key)");
166 }
167 408 => {
168 eprintln!("Request Timeout, request should be retried after some time");
169 retry().await;
170 }
171 413 => {
172 eprintln!("Payload too large (batch is above 5MB uncompressed)");
173 let logs_len = logs.len();
175 let half = logs_len / 2;
176 let (left, right) = logs.split_at(half);
177 self.send_logs(left, retries + 1).await;
178 self.send_logs(right, retries + 1).await;
179 }
180 429 => {
181 eprintln!("Too Many Requests, request should be retried after some time");
182 retry().await;
183 }
184 500 => {
185 eprintln!("Internal Server Error, the server encountered an unexpected condition that prevented it from fulfilling the request, request should be retried after some time");
186 retry().await;
187 }
188 503 => {
189 eprintln!("Service Unavailable, the server is not ready to handle the request probably because it is overloaded, request should be retried after some time");
190 retry().await;
191 }
192 _ => {
193 eprintln!("Unknown error, try again later");
194 retry().await;
195 }
196 },
197 Err(e) => {
198 eprintln!("Failed to send logs to Datadog: {e:?}");
199 }
200 }
201 }
202
203 #[async_recursion]
204 async fn try_send(&mut self, is_flush: bool) {
205 {
206 let queue = self.queue.read().await;
208 if queue.is_empty() {
209 return;
210 }
211 if !is_flush {
212 let last_log = queue.back().unwrap();
215 let now = Utc::now();
216 let diff = now - last_log.received_at;
217 if diff < chrono::Duration::seconds(MAX_BATCH_DURATION_SECS)
218 && queue.len() < MAX_BATCH_SIZE
219 {
220 return;
221 }
222 }
223 }
224
225 let logs = {
227 let mut queue = self.queue.write().await;
228 let tail = usize::min(queue.len(), MAX_BATCH_SIZE);
230 queue.drain(..tail).map(|e| e.log).collect::<Vec<_>>()
231 };
232
233 self.send_logs(&logs, 0).await;
235
236 let is_queue_empty = { self.queue.read().await.is_empty() };
238 if !is_queue_empty {
239 self.try_send(is_flush).await;
240 }
241 }
242}
243
244impl Clone for DatadogLogIngestor {
245 fn clone(&self) -> Self {
246 Self {
247 url: self.url.clone(),
248 service_name: self.service_name.clone(),
249 api_key: self.api_key.clone(),
250 tags: self.tags.clone(),
251 client: self.client.clone(),
252 queue: self.queue.clone(),
253 }
254 }
255}
256
257#[async_trait]
258impl LogIngestor for DatadogLogIngestor {
259 fn name(&self) -> &'static str {
260 "datadog"
261 }
262
263 fn start(&self) {
264 let mut this = self.clone();
266 tokio::spawn(async move {
267 let period = Duration::from_secs(MAX_BATCH_DURATION_SECS as u64);
268 let mut interval = tokio::time::interval(period);
269 loop {
270 interval.tick().await;
271 this.try_send(false).await;
272 }
273 });
274 }
275
276 async fn ingest(&mut self, mut log: Log) {
277 log.insert("ddsource".to_string(), json!(DD_SOURCE));
279 log.insert("ddtags".to_string(), json!(self.tags));
280 log.insert("service".to_string(), json!(self.service_name));
281 let log_event = LogEvent {
282 log,
283 received_at: Utc::now(),
284 };
285 self.queue.write().await.push_back(log_event);
286 }
287
288 async fn flush(&mut self) {
289 self.try_send(true).await;
290 }
291}