dd_tracing_layer/
datadog_ingestor.rs

1use async_recursion::async_recursion;
2use async_trait::async_trait;
3use chrono::Utc;
4use log_tracing_layer::{Log, LogEvent, LogIngestor};
5
6#[cfg(not(feature = "tls"))]
7use reqwest_default as reqwest;
8
9#[cfg(feature = "tls")]
10use reqwest_tls as reqwest;
11
12use serde_json::json;
13use std::{collections::VecDeque, error::Error, io::Write, sync::Arc, time::Duration};
14use tokio::sync::RwLock;
15
16const DD_SOURCE: &str = "dd-tracing-layer";
17const MAX_BATCH_SIZE: usize = 1000;
18const MAX_BATCH_DURATION_SECS: i64 = 5;
19const MAX_RETRIES: u8 = 3;
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
22pub enum Region {
23    US1,
24    US3,
25    US5,
26    US1FED,
27    EU,
28    AP1,
29    AP2,
30}
31
32#[derive(Debug, Default)]
33pub struct DatadogOptions {
34    pub api_key: String,
35    pub service_name: String,
36    pub region: Option<Region>,
37    pub url: Option<String>,
38    pub tags: Option<String>,
39}
40
41impl DatadogOptions {
42    pub fn new(service_name: impl Into<String>, api_key: impl Into<String>) -> Self {
43        Self {
44            service_name: service_name.into(),
45            api_key: api_key.into(),
46            ..Default::default()
47        }
48    }
49
50    #[must_use]
51    pub const fn with_region(mut self, region: Region) -> Self {
52        self.region = Some(region);
53        self
54    }
55
56    #[must_use]
57    pub fn with_tags(mut self, tags: impl Into<String>) -> Self {
58        self.tags = Some(tags.into());
59        self
60    }
61
62    #[must_use]
63    pub fn with_url(mut self, url: impl Into<String>) -> Self {
64        self.url = Some(url.into());
65        self
66    }
67}
68
69#[derive(Debug, Default)]
70pub struct DatadogLogIngestor {
71    url: String,
72    service_name: String,
73    api_key: String,
74    tags: String,
75    client: reqwest::Client,
76    queue: Arc<RwLock<VecDeque<LogEvent>>>,
77}
78
79impl DatadogLogIngestor {
80    pub fn new(options: DatadogOptions) -> Self {
81        // https://docs.datadoghq.com/logs/log_collection/?tab=serverless#supported-endpoints
82        let url = options.url.unwrap_or_else(|| {
83            match options.region {
84                Some(Region::US1) | None => "https://http-intake.logs.datadoghq.com/api/v2/logs",
85                Some(Region::US3) => "https://http-intake.logs.us3.datadoghq.com/api/v2/logs",
86                Some(Region::US5) => "https://http-intake.logs.us5.datadoghq.com/api/v2/logs",
87                Some(Region::US1FED) => "https://http-intake.logs.ddog-gov.com/api/v2/logs",
88                Some(Region::EU) => "https://http-intake.logs.datadoghq.eu/api/v2/logs",
89                Some(Region::AP1) => "https://http-intake.logs.ap1.datadoghq.com/api/v2/logs",
90                Some(Region::AP2) => "https://http-intake.logs.ap2.datadoghq.com/api/v2/logs",
91            }
92            .to_string()
93        });
94
95        let source_tags = &format!("source-version:{}", env!("CARGO_PKG_VERSION"));
96
97        let tags = options
98            .tags
99            .map_or_else(|| source_tags.into(), |t| format!("{t}, {source_tags}"));
100
101        Self {
102            url,
103            service_name: options.service_name,
104            api_key: options.api_key,
105            tags,
106            client: reqwest::Client::new(),
107            queue: Arc::new(RwLock::new(VecDeque::new())),
108        }
109    }
110
111    fn compress(logs: &[Log]) -> Result<Vec<u8>, Box<dyn Error>> {
112        let bytes = serde_json::to_vec(&logs)?;
113        let mut encoder = libflate::gzip::Encoder::new(Vec::new())?;
114        encoder.write_all(&bytes)?;
115        let result = encoder.finish().into_result()?;
116        Ok(result)
117    }
118
119    #[async_recursion]
120    async fn send_logs(&self, logs: &[Log], retries: u8) {
121        if retries > MAX_RETRIES {
122            eprintln!("Failed to send logs after {retries} retries");
123            return;
124        }
125
126        let retry = || async {
127            let next = retries + 1;
128            let next_time = 100 * u64::from(next);
129            tokio::time::sleep(Duration::from_millis(next_time)).await;
130            self.send_logs(logs, next).await;
131        };
132
133        // compress the logs
134        let compressed_logs = match Self::compress(logs) {
135            Ok(logs) => logs,
136            Err(e) => {
137                eprintln!("Failed to compress logs: {e:?}");
138                return;
139            }
140        };
141
142        // https://docs.datadoghq.com/api/latest/logs/?code-lang=typescript
143        match self
144            .client
145            .post(&self.url)
146            .header("User-Agent", "dd-tracing-subscriber/0.1.0")
147            .header("DD-API-KEY", &self.api_key)
148            .header("Content-Type", "application/json")
149            .header("Content-Encoding", "gzip")
150            .body(compressed_logs)
151            .send()
152            .await
153        {
154            Ok(res) => match res.status().as_u16() {
155                202 => {
156                    // println!("Accepted: the request has been accepted for processing");
157                }
158                400 => {
159                    eprintln!("Bad request (likely an issue in the payload formatting)");
160                }
161                401 => {
162                    eprintln!("Unauthorized (likely a missing API Key)");
163                }
164                403 => {
165                    eprintln!("Permission issue (likely using an invalid API Key)");
166                }
167                408 => {
168                    eprintln!("Request Timeout, request should be retried after some time");
169                    retry().await;
170                }
171                413 => {
172                    eprintln!("Payload too large (batch is above 5MB uncompressed)");
173                    // split batch
174                    let logs_len = logs.len();
175                    let half = logs_len / 2;
176                    let (left, right) = logs.split_at(half);
177                    self.send_logs(left, retries + 1).await;
178                    self.send_logs(right, retries + 1).await;
179                }
180                429 => {
181                    eprintln!("Too Many Requests, request should be retried after some time");
182                    retry().await;
183                }
184                500 => {
185                    eprintln!("Internal Server Error, the server encountered an unexpected condition that prevented it from fulfilling the request, request should be retried after some time");
186                    retry().await;
187                }
188                503 => {
189                    eprintln!("Service Unavailable, the server is not ready to handle the request probably because it is overloaded, request should be retried after some time");
190                    retry().await;
191                }
192                _ => {
193                    eprintln!("Unknown error, try again later");
194                    retry().await;
195                }
196            },
197            Err(e) => {
198                eprintln!("Failed to send logs to Datadog: {e:?}");
199            }
200        }
201    }
202
203    #[async_recursion]
204    async fn try_send(&mut self, is_flush: bool) {
205        {
206            // send current logs to datadog if there are any
207            let queue = self.queue.read().await;
208            if queue.is_empty() {
209                return;
210            }
211            if !is_flush {
212                // send the logs only if the last one is more than 5 seconds old
213                // or if the queue has more than MAX_BATCH_SIZE logs
214                let last_log = queue.back().unwrap();
215                let now = Utc::now();
216                let diff = now - last_log.received_at;
217                if diff < chrono::Duration::seconds(MAX_BATCH_DURATION_SECS)
218                    && queue.len() < MAX_BATCH_SIZE
219                {
220                    return;
221                }
222            }
223        }
224
225        // get the logs to send
226        let logs = {
227            let mut queue = self.queue.write().await;
228            // max amount of logs to send at once is 1000
229            let tail = usize::min(queue.len(), MAX_BATCH_SIZE);
230            queue.drain(..tail).map(|e| e.log).collect::<Vec<_>>()
231        };
232
233        // send them (retries if it fails)
234        self.send_logs(&logs, 0).await;
235
236        // check if the queue is empty and flush again if it's not
237        let is_queue_empty = { self.queue.read().await.is_empty() };
238        if !is_queue_empty {
239            self.try_send(is_flush).await;
240        }
241    }
242}
243
244impl Clone for DatadogLogIngestor {
245    fn clone(&self) -> Self {
246        Self {
247            url: self.url.clone(),
248            service_name: self.service_name.clone(),
249            api_key: self.api_key.clone(),
250            tags: self.tags.clone(),
251            client: self.client.clone(),
252            queue: self.queue.clone(),
253        }
254    }
255}
256
257#[async_trait]
258impl LogIngestor for DatadogLogIngestor {
259    fn name(&self) -> &'static str {
260        "datadog"
261    }
262
263    fn start(&self) {
264        // start a timer that will flush the queue every n seconds
265        let mut this = self.clone();
266        tokio::spawn(async move {
267            let period = Duration::from_secs(MAX_BATCH_DURATION_SECS as u64);
268            let mut interval = tokio::time::interval(period);
269            loop {
270                interval.tick().await;
271                this.try_send(false).await;
272            }
273        });
274    }
275
276    async fn ingest(&mut self, mut log: Log) {
277        // add datadog specific fields
278        log.insert("ddsource".to_string(), json!(DD_SOURCE));
279        log.insert("ddtags".to_string(), json!(self.tags));
280        log.insert("service".to_string(), json!(self.service_name));
281        let log_event = LogEvent {
282            log,
283            received_at: Utc::now(),
284        };
285        self.queue.write().await.push_back(log_event);
286    }
287
288    async fn flush(&mut self) {
289        self.try_send(true).await;
290    }
291}