datadog_apm/
client.rs

1use hyper::{Body, Method, Request};
2
3use hyper::client::connect::HttpConnector;
4use rmp::encode;
5use serde::Serialize;
6use tokio::sync::mpsc;
7
8use std::collections::HashMap;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11#[derive(Debug, Clone)]
12pub struct Client {
13    env: Option<String>,
14    endpoint: String,
15    service: String,
16    http_client: hyper::Client<HttpConnector>,
17    buffer_sender: mpsc::Sender<Trace>,
18    buffer_size: usize,
19    buffer_flush_max_interval: Duration,
20}
21
22/// Configuration settings for the client.
23#[derive(Debug)]
24pub struct Config {
25    /// Datadog apm service name
26    pub service: String,
27    /// Datadog apm environment
28    pub env: Option<String>,
29    /// Datadog agent host/ip, defaults to `localhost`.
30    pub host: String,
31    /// Datadog agent port, defaults to `8196`.
32    pub port: String,
33    /// Client buffer queue capacity, defaults to `std::u16::MAX`.
34    /// It is used for limit the amount of traces being queued in memory before drop. The client should handle send all the traces before the queue is full, you usually don't need to change this value.
35    pub buffer_queue_capacity: u16,
36    /// The buffer size, defaults to 200. It's the amount of traces send in a single request to datadog agent.
37    pub buffer_size: u16,
38    /// The buffer flush maximum interval, defaults to 200 ms. It's the maximum amount of time between buffer flushes that is the time we wait to buffer the traces before send if the buffer does not reach the buffer_size.
39    pub buffer_flush_max_interval: Duration,
40}
41
42impl Default for Config {
43    fn default() -> Self {
44        Config {
45            env: None,
46            host: "localhost".to_string(),
47            port: "8126".to_string(),
48            service: "".to_string(),
49            buffer_queue_capacity: std::u16::MAX,
50            buffer_size: 200,
51            buffer_flush_max_interval: Duration::from_millis(200),
52        }
53    }
54}
55
56impl Client {
57    pub fn new(config: Config) -> Client {
58        let (buffer_sender, buffer_receiver) = mpsc::channel(config.buffer_queue_capacity as usize);
59
60        let client = Client {
61            env: config.env,
62            service: config.service,
63            endpoint: format!("http://{}:{}/v0.3/traces", config.host, config.port),
64            http_client: hyper::Client::new(),
65            buffer_sender: buffer_sender,
66            buffer_size: config.buffer_size as usize,
67            buffer_flush_max_interval: config.buffer_flush_max_interval,
68        };
69
70        spawn_consume_buffer_task(buffer_receiver, client.clone());
71
72        client
73    }
74
75    pub fn send_trace(mut self, trace: Trace) {
76        match self.buffer_sender.try_send(trace) {
77            Ok(_) => trace!("trace enqueued"),
78            Err(err) => warn!("could not enqueue trace: {:?}", err),
79        };
80    }
81
82    async fn send_traces(self, traces: Vec<Trace>) {
83        let traces = traces
84            .iter()
85            .map(|trace| map_to_raw_spans(trace, self.env.clone(), self.service.clone()))
86            .collect::<Vec<Vec<RawSpan>>>();
87
88        let trace_count = traces.len();
89        let payload = serialize_as_msgpack(traces);
90
91        let req = Request::builder()
92            .method(Method::POST)
93            .uri(self.endpoint)
94            .header("content-type", "application/msgpack")
95            .header("content-length", payload.len())
96            .header("X-Datadog-Trace-Count", trace_count)
97            .body(Body::from(payload))
98            .unwrap();
99
100        match self.http_client.request(req).await {
101            Ok(resp) => {
102                if resp.status().is_success() {
103                    trace!("{} traces sent to datadog", trace_count)
104                } else {
105                    error!("error sending traces to datadog: {:?}", resp)
106                }
107            }
108            Err(err) => error!("error sending traces to datadog: {:?}", err),
109        }
110    }
111}
112
113#[derive(Debug, Clone)]
114pub struct Trace {
115    pub id: u64,
116    pub spans: Vec<Span>,
117    pub priority: u32,
118}
119
120#[derive(Debug, Clone)]
121pub struct Span {
122    pub id: u64,
123    pub name: String,
124    pub resource: String,
125    pub parent_id: Option<u64>,
126    pub start: SystemTime,
127    pub duration: Duration,
128    pub error: Option<ErrorInfo>,
129    pub http: Option<HttpInfo>,
130    pub sql: Option<SqlInfo>,
131    pub r#type: String,
132    pub tags: HashMap<String, String>,
133}
134
135#[derive(Debug, Clone)]
136pub struct ErrorInfo {
137    pub r#type: String,
138    pub msg: String,
139    pub stack: String,
140}
141
142#[derive(Debug, Clone)]
143pub struct HttpInfo {
144    pub url: String,
145    pub status_code: String,
146    pub method: String,
147}
148
149#[derive(Debug, Clone)]
150pub struct SqlInfo {
151    pub query: String,
152    pub rows: String,
153    pub db: String,
154}
155
156#[derive(Debug, Serialize, Clone, PartialEq)]
157struct RawSpan {
158    service: String,
159    name: String,
160    resource: String,
161    trace_id: u64,
162    span_id: u64,
163    parent_id: Option<u64>,
164    start: u64,
165    duration: u64,
166    error: i32,
167    meta: HashMap<String, String>,
168    metrics: HashMap<String, f64>,
169    r#type: String,
170}
171
172fn spawn_consume_buffer_task(mut buffer_receiver: mpsc::Receiver<Trace>, client: Client) {
173    tokio::spawn(async move {
174        let mut buffer = Vec::with_capacity(client.buffer_size);
175        let mut last_flushed_at = SystemTime::now();
176        loop {
177            let client = client.clone();
178
179            match buffer_receiver.try_recv() {
180                Ok(trace) => {
181                    buffer.push(trace);
182                }
183                Err(_) => {
184                    tokio::time::delay_for(client.buffer_flush_max_interval).await;
185                }
186            }
187
188            if buffer.len() == client.buffer_size
189                || flush_max_interval_has_passed(&buffer, &client, last_flushed_at)
190            {
191                client.send_traces(buffer.drain(..).collect()).await;
192                last_flushed_at = SystemTime::now();
193            }
194        }
195
196        fn flush_max_interval_has_passed<T>(
197            buffer: &Vec<T>,
198            client: &Client,
199            last_flushed_at: SystemTime,
200        ) -> bool {
201            buffer.len() > 0
202                && SystemTime::now().duration_since(last_flushed_at).unwrap()
203                    > client.buffer_flush_max_interval
204        }
205    });
206}
207
208fn serialize_as_msgpack(traces: Vec<Vec<RawSpan>>) -> Vec<u8> {
209    // this function uses a hack over rpm_serde library,
210    // because the lib does not work when the struct is wrapped in a array,
211    // so it manually encode the array, and then serialize each entity in a loop
212
213    let mut buf = Vec::new();
214
215    encode::write_array_len(&mut buf, traces.len() as u32).unwrap();
216    for spans in traces {
217        encode::write_array_len(&mut buf, spans.len() as u32).unwrap();
218        for span in spans {
219            let mut se = rmps::Serializer::new(&mut buf).with_struct_map();
220            span.serialize(&mut se).unwrap();
221        }
222    }
223    buf
224}
225
226fn fill_meta(span: &Span, env: Option<String>) -> HashMap<String, String> {
227    let mut meta = HashMap::new();
228    if let Some(env) = env {
229        meta.insert("env".to_string(), env);
230    }
231
232    if let Some(http) = &span.http {
233        meta.insert("http.status_code".to_string(), http.status_code.clone());
234        meta.insert("http.method".to_string(), http.method.clone());
235        meta.insert("http.url".to_string(), http.url.clone());
236    }
237    if let Some(error) = &span.error {
238        meta.insert("error.type".to_string(), error.r#type.clone());
239        meta.insert("error.msg".to_string(), error.msg.clone());
240        meta.insert("error.stack".to_string(), error.stack.clone());
241    }
242    if let Some(sql) = &span.sql {
243        meta.insert("sql.query".to_string(), sql.query.clone());
244        meta.insert("sql.rows".to_string(), sql.rows.clone());
245        meta.insert("sql.db".to_string(), sql.db.clone());
246    }
247    for (key, value) in &span.tags {
248        meta.insert(key.to_string(), value.to_string());
249    }
250    meta
251}
252
253fn fill_metrics(priority: u32) -> HashMap<String, f64> {
254    let mut metrics = HashMap::new();
255    metrics.insert("_sampling_priority_v1".to_string(), f64::from(priority));
256    metrics
257}
258
259fn map_to_raw_spans(trace: &Trace, env: Option<String>, service: String) -> Vec<RawSpan> {
260    let mut traces = Vec::new();
261    for span in &trace.spans {
262        traces.push(RawSpan {
263            service: service.clone(),
264            trace_id: trace.id,
265            span_id: span.id,
266            name: span.name.clone(),
267            resource: span.resource.clone(),
268            parent_id: span.parent_id,
269            start: duration_to_nanos(span.start.duration_since(UNIX_EPOCH).unwrap()),
270            duration: duration_to_nanos(span.duration),
271            error: if span.error.is_some() { 1 } else { 0 },
272            r#type: span.r#type.clone(),
273            meta: fill_meta(&span, env.clone()),
274            metrics: fill_metrics(trace.priority),
275        });
276    }
277    traces
278}
279
280fn duration_to_nanos(duration: Duration) -> u64 {
281    duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64
282}
283
284#[cfg(test)]
285mod tests {
286    extern crate rand;
287
288    use super::*;
289
290    use rand::Rng;
291    use serde_json::json;
292
293    #[tokio::test]
294    #[ignore]
295    async fn test_send_trace() {
296        let config = Config {
297            service: String::from("service_name"),
298            ..Default::default()
299        };
300        let client = Client::new(config);
301        let trace = a_trace();
302        client.send_trace(trace);
303    }
304
305    #[tokio::test]
306    async fn test_map_to_raw_spans() {
307        let config = Config {
308            service: String::from("service_name"),
309            env: Some(String::from("staging")),
310            ..Default::default()
311        };
312        let trace = a_trace();
313
314        let mut expected = Vec::new();
315        for span in &trace.spans {
316            let mut meta: HashMap<String, String> = HashMap::new();
317            meta.insert("env".to_string(), config.env.clone().unwrap());
318            if let Some(http) = &span.http {
319                meta.insert("http.url".to_string(), http.url.clone());
320                meta.insert("http.method".to_string(), http.method.clone());
321                meta.insert("http.status_code".to_string(), http.status_code.clone());
322            }
323
324            let mut metrics = HashMap::new();
325            metrics.insert(
326                "_sampling_priority_v1".to_string(),
327                f64::from(trace.priority),
328            );
329
330            expected.push(RawSpan {
331                trace_id: trace.id,
332                span_id: span.id,
333                parent_id: span.parent_id,
334                name: span.name.clone(),
335                resource: span.resource.clone(),
336                service: config.service.clone(),
337                r#type: span.r#type.clone(),
338                start: duration_to_nanos(span.start.duration_since(UNIX_EPOCH).unwrap()),
339                duration: duration_to_nanos(span.duration),
340                error: 0,
341                meta: meta,
342                metrics: metrics,
343            });
344        }
345        let raw_spans = map_to_raw_spans(&trace, config.env, config.service);
346
347        assert_eq!(raw_spans, expected);
348    }
349
350    #[tokio::test]
351    async fn test_message_pack_serialization() {
352        let generate_span = || {
353            let mut rng = rand::thread_rng();
354            let now = SystemTime::now()
355                .duration_since(UNIX_EPOCH)
356                .unwrap()
357                .as_secs();
358            RawSpan {
359                trace_id: rng.gen::<u64>(),
360                span_id: rng.gen::<u64>(),
361                parent_id: None,
362                name: String::from("request"),
363                resource: String::from("/home"),
364                service: String::from("service_name"),
365                r#type: String::from("web"),
366                start: now * 1_000_000_000,
367                duration: 4853472865,
368                error: 0,
369                meta: std::collections::HashMap::new(),
370                metrics: std::collections::HashMap::new(),
371            }
372        };
373
374        let traces = (0..3).map(|_| vec![generate_span()]).collect::<Vec<_>>();
375        let result = serialize_as_msgpack(traces.clone());
376
377        let msgpack_as_json: serde_json::Value = rmp_serde::from_read_ref(&result).unwrap();
378
379        // debugging utility:
380        //serde_json::to_writer_pretty(std::io::stdout(), &msgpack_as_json).unwrap();
381
382        assert_eq!(msgpack_as_json, json!(traces));
383    }
384
385    fn a_trace() -> Trace {
386        let mut rng = rand::thread_rng();
387        Trace {
388            id: rng.gen::<u64>(),
389            priority: 1,
390            spans: vec![Span {
391                id: rng.gen::<u64>(),
392                name: String::from("request"),
393                resource: String::from("/home/v3"),
394                r#type: String::from("web"),
395                start: SystemTime::now(),
396                duration: Duration::from_secs(2),
397                parent_id: None,
398                http: Some(HttpInfo {
399                    url: String::from("/home/v3/2?trace=true"),
400                    method: String::from("GET"),
401                    status_code: String::from("200"),
402                }),
403                error: None,
404                sql: None,
405                tags: HashMap::new(),
406            }],
407        }
408    }
409}