1use hyper::{Body, Method, Request};
2
3use hyper::client::connect::HttpConnector;
4use rmp::encode;
5use serde::Serialize;
6use tokio::sync::mpsc;
7
8use std::collections::HashMap;
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11#[derive(Debug, Clone)]
12pub struct Client {
13 env: Option<String>,
14 endpoint: String,
15 service: String,
16 http_client: hyper::Client<HttpConnector>,
17 buffer_sender: mpsc::Sender<Trace>,
18 buffer_size: usize,
19 buffer_flush_max_interval: Duration,
20}
21
22#[derive(Debug)]
24pub struct Config {
25 pub service: String,
27 pub env: Option<String>,
29 pub host: String,
31 pub port: String,
33 pub buffer_queue_capacity: u16,
36 pub buffer_size: u16,
38 pub buffer_flush_max_interval: Duration,
40}
41
42impl Default for Config {
43 fn default() -> Self {
44 Config {
45 env: None,
46 host: "localhost".to_string(),
47 port: "8126".to_string(),
48 service: "".to_string(),
49 buffer_queue_capacity: std::u16::MAX,
50 buffer_size: 200,
51 buffer_flush_max_interval: Duration::from_millis(200),
52 }
53 }
54}
55
56impl Client {
57 pub fn new(config: Config) -> Client {
58 let (buffer_sender, buffer_receiver) = mpsc::channel(config.buffer_queue_capacity as usize);
59
60 let client = Client {
61 env: config.env,
62 service: config.service,
63 endpoint: format!("http://{}:{}/v0.3/traces", config.host, config.port),
64 http_client: hyper::Client::new(),
65 buffer_sender: buffer_sender,
66 buffer_size: config.buffer_size as usize,
67 buffer_flush_max_interval: config.buffer_flush_max_interval,
68 };
69
70 spawn_consume_buffer_task(buffer_receiver, client.clone());
71
72 client
73 }
74
75 pub fn send_trace(mut self, trace: Trace) {
76 match self.buffer_sender.try_send(trace) {
77 Ok(_) => trace!("trace enqueued"),
78 Err(err) => warn!("could not enqueue trace: {:?}", err),
79 };
80 }
81
82 async fn send_traces(self, traces: Vec<Trace>) {
83 let traces = traces
84 .iter()
85 .map(|trace| map_to_raw_spans(trace, self.env.clone(), self.service.clone()))
86 .collect::<Vec<Vec<RawSpan>>>();
87
88 let trace_count = traces.len();
89 let payload = serialize_as_msgpack(traces);
90
91 let req = Request::builder()
92 .method(Method::POST)
93 .uri(self.endpoint)
94 .header("content-type", "application/msgpack")
95 .header("content-length", payload.len())
96 .header("X-Datadog-Trace-Count", trace_count)
97 .body(Body::from(payload))
98 .unwrap();
99
100 match self.http_client.request(req).await {
101 Ok(resp) => {
102 if resp.status().is_success() {
103 trace!("{} traces sent to datadog", trace_count)
104 } else {
105 error!("error sending traces to datadog: {:?}", resp)
106 }
107 }
108 Err(err) => error!("error sending traces to datadog: {:?}", err),
109 }
110 }
111}
112
113#[derive(Debug, Clone)]
114pub struct Trace {
115 pub id: u64,
116 pub spans: Vec<Span>,
117 pub priority: u32,
118}
119
120#[derive(Debug, Clone)]
121pub struct Span {
122 pub id: u64,
123 pub name: String,
124 pub resource: String,
125 pub parent_id: Option<u64>,
126 pub start: SystemTime,
127 pub duration: Duration,
128 pub error: Option<ErrorInfo>,
129 pub http: Option<HttpInfo>,
130 pub sql: Option<SqlInfo>,
131 pub r#type: String,
132 pub tags: HashMap<String, String>,
133}
134
135#[derive(Debug, Clone)]
136pub struct ErrorInfo {
137 pub r#type: String,
138 pub msg: String,
139 pub stack: String,
140}
141
142#[derive(Debug, Clone)]
143pub struct HttpInfo {
144 pub url: String,
145 pub status_code: String,
146 pub method: String,
147}
148
149#[derive(Debug, Clone)]
150pub struct SqlInfo {
151 pub query: String,
152 pub rows: String,
153 pub db: String,
154}
155
156#[derive(Debug, Serialize, Clone, PartialEq)]
157struct RawSpan {
158 service: String,
159 name: String,
160 resource: String,
161 trace_id: u64,
162 span_id: u64,
163 parent_id: Option<u64>,
164 start: u64,
165 duration: u64,
166 error: i32,
167 meta: HashMap<String, String>,
168 metrics: HashMap<String, f64>,
169 r#type: String,
170}
171
172fn spawn_consume_buffer_task(mut buffer_receiver: mpsc::Receiver<Trace>, client: Client) {
173 tokio::spawn(async move {
174 let mut buffer = Vec::with_capacity(client.buffer_size);
175 let mut last_flushed_at = SystemTime::now();
176 loop {
177 let client = client.clone();
178
179 match buffer_receiver.try_recv() {
180 Ok(trace) => {
181 buffer.push(trace);
182 }
183 Err(_) => {
184 tokio::time::delay_for(client.buffer_flush_max_interval).await;
185 }
186 }
187
188 if buffer.len() == client.buffer_size
189 || flush_max_interval_has_passed(&buffer, &client, last_flushed_at)
190 {
191 client.send_traces(buffer.drain(..).collect()).await;
192 last_flushed_at = SystemTime::now();
193 }
194 }
195
196 fn flush_max_interval_has_passed<T>(
197 buffer: &Vec<T>,
198 client: &Client,
199 last_flushed_at: SystemTime,
200 ) -> bool {
201 buffer.len() > 0
202 && SystemTime::now().duration_since(last_flushed_at).unwrap()
203 > client.buffer_flush_max_interval
204 }
205 });
206}
207
208fn serialize_as_msgpack(traces: Vec<Vec<RawSpan>>) -> Vec<u8> {
209 let mut buf = Vec::new();
214
215 encode::write_array_len(&mut buf, traces.len() as u32).unwrap();
216 for spans in traces {
217 encode::write_array_len(&mut buf, spans.len() as u32).unwrap();
218 for span in spans {
219 let mut se = rmps::Serializer::new(&mut buf).with_struct_map();
220 span.serialize(&mut se).unwrap();
221 }
222 }
223 buf
224}
225
226fn fill_meta(span: &Span, env: Option<String>) -> HashMap<String, String> {
227 let mut meta = HashMap::new();
228 if let Some(env) = env {
229 meta.insert("env".to_string(), env);
230 }
231
232 if let Some(http) = &span.http {
233 meta.insert("http.status_code".to_string(), http.status_code.clone());
234 meta.insert("http.method".to_string(), http.method.clone());
235 meta.insert("http.url".to_string(), http.url.clone());
236 }
237 if let Some(error) = &span.error {
238 meta.insert("error.type".to_string(), error.r#type.clone());
239 meta.insert("error.msg".to_string(), error.msg.clone());
240 meta.insert("error.stack".to_string(), error.stack.clone());
241 }
242 if let Some(sql) = &span.sql {
243 meta.insert("sql.query".to_string(), sql.query.clone());
244 meta.insert("sql.rows".to_string(), sql.rows.clone());
245 meta.insert("sql.db".to_string(), sql.db.clone());
246 }
247 for (key, value) in &span.tags {
248 meta.insert(key.to_string(), value.to_string());
249 }
250 meta
251}
252
253fn fill_metrics(priority: u32) -> HashMap<String, f64> {
254 let mut metrics = HashMap::new();
255 metrics.insert("_sampling_priority_v1".to_string(), f64::from(priority));
256 metrics
257}
258
259fn map_to_raw_spans(trace: &Trace, env: Option<String>, service: String) -> Vec<RawSpan> {
260 let mut traces = Vec::new();
261 for span in &trace.spans {
262 traces.push(RawSpan {
263 service: service.clone(),
264 trace_id: trace.id,
265 span_id: span.id,
266 name: span.name.clone(),
267 resource: span.resource.clone(),
268 parent_id: span.parent_id,
269 start: duration_to_nanos(span.start.duration_since(UNIX_EPOCH).unwrap()),
270 duration: duration_to_nanos(span.duration),
271 error: if span.error.is_some() { 1 } else { 0 },
272 r#type: span.r#type.clone(),
273 meta: fill_meta(&span, env.clone()),
274 metrics: fill_metrics(trace.priority),
275 });
276 }
277 traces
278}
279
280fn duration_to_nanos(duration: Duration) -> u64 {
281 duration.as_secs() * 1_000_000_000 + duration.subsec_nanos() as u64
282}
283
284#[cfg(test)]
285mod tests {
286 extern crate rand;
287
288 use super::*;
289
290 use rand::Rng;
291 use serde_json::json;
292
293 #[tokio::test]
294 #[ignore]
295 async fn test_send_trace() {
296 let config = Config {
297 service: String::from("service_name"),
298 ..Default::default()
299 };
300 let client = Client::new(config);
301 let trace = a_trace();
302 client.send_trace(trace);
303 }
304
305 #[tokio::test]
306 async fn test_map_to_raw_spans() {
307 let config = Config {
308 service: String::from("service_name"),
309 env: Some(String::from("staging")),
310 ..Default::default()
311 };
312 let trace = a_trace();
313
314 let mut expected = Vec::new();
315 for span in &trace.spans {
316 let mut meta: HashMap<String, String> = HashMap::new();
317 meta.insert("env".to_string(), config.env.clone().unwrap());
318 if let Some(http) = &span.http {
319 meta.insert("http.url".to_string(), http.url.clone());
320 meta.insert("http.method".to_string(), http.method.clone());
321 meta.insert("http.status_code".to_string(), http.status_code.clone());
322 }
323
324 let mut metrics = HashMap::new();
325 metrics.insert(
326 "_sampling_priority_v1".to_string(),
327 f64::from(trace.priority),
328 );
329
330 expected.push(RawSpan {
331 trace_id: trace.id,
332 span_id: span.id,
333 parent_id: span.parent_id,
334 name: span.name.clone(),
335 resource: span.resource.clone(),
336 service: config.service.clone(),
337 r#type: span.r#type.clone(),
338 start: duration_to_nanos(span.start.duration_since(UNIX_EPOCH).unwrap()),
339 duration: duration_to_nanos(span.duration),
340 error: 0,
341 meta: meta,
342 metrics: metrics,
343 });
344 }
345 let raw_spans = map_to_raw_spans(&trace, config.env, config.service);
346
347 assert_eq!(raw_spans, expected);
348 }
349
350 #[tokio::test]
351 async fn test_message_pack_serialization() {
352 let generate_span = || {
353 let mut rng = rand::thread_rng();
354 let now = SystemTime::now()
355 .duration_since(UNIX_EPOCH)
356 .unwrap()
357 .as_secs();
358 RawSpan {
359 trace_id: rng.gen::<u64>(),
360 span_id: rng.gen::<u64>(),
361 parent_id: None,
362 name: String::from("request"),
363 resource: String::from("/home"),
364 service: String::from("service_name"),
365 r#type: String::from("web"),
366 start: now * 1_000_000_000,
367 duration: 4853472865,
368 error: 0,
369 meta: std::collections::HashMap::new(),
370 metrics: std::collections::HashMap::new(),
371 }
372 };
373
374 let traces = (0..3).map(|_| vec![generate_span()]).collect::<Vec<_>>();
375 let result = serialize_as_msgpack(traces.clone());
376
377 let msgpack_as_json: serde_json::Value = rmp_serde::from_read_ref(&result).unwrap();
378
379 assert_eq!(msgpack_as_json, json!(traces));
383 }
384
385 fn a_trace() -> Trace {
386 let mut rng = rand::thread_rng();
387 Trace {
388 id: rng.gen::<u64>(),
389 priority: 1,
390 spans: vec![Span {
391 id: rng.gen::<u64>(),
392 name: String::from("request"),
393 resource: String::from("/home/v3"),
394 r#type: String::from("web"),
395 start: SystemTime::now(),
396 duration: Duration::from_secs(2),
397 parent_id: None,
398 http: Some(HttpInfo {
399 url: String::from("/home/v3/2?trace=true"),
400 method: String::from("GET"),
401 status_code: String::from("200"),
402 }),
403 error: None,
404 sql: None,
405 tags: HashMap::new(),
406 }],
407 }
408 }
409}