datadog_apm_sync/
client.rs

1use crate::{api::RawSpan, model::Span};
2
3use attohttpc;
4use chrono::{DateTime, Duration, TimeZone, Utc};
5use log::{warn, Level as LogLevel, Log, Record};
6use serde_json::to_string;
7use std::{
8    cell::Cell,
9    collections::{HashMap, VecDeque},
10    sync::{
11        atomic::{AtomicU16, AtomicU32, Ordering},
12        mpsc,
13    },
14};
15
16#[cfg(feature = "json")]
17use log::kv;
18
19/// Configuration settings for the client.
20#[derive(Clone, Debug)]
21pub struct Config {
22    /// Datadog apm service name
23    pub service: String,
24    /// Datadog apm environment
25    pub env: Option<String>,
26    /// Datadog agent host/ip, defaults to `localhost`.
27    pub host: String,
28    /// Datadog agent port, defaults to `8126`.
29    pub port: String,
30    /// Optional Logging Config to also set this tracer as the main logger
31    pub logging_config: Option<LoggingConfig>,
32    /// APM Config to set up APM Analytics (default is to disable)
33    pub apm_config: ApmConfig,
34    /// Turn on tracing
35    pub enable_tracing: bool,
36    /// Number of threads to send the HTTP messages to the Datadog agent
37    pub num_client_send_threads: u32,
38}
39
40impl Default for Config {
41    fn default() -> Self {
42        Config {
43            env: None,
44            host: "localhost".to_string(),
45            port: "8126".to_string(),
46            service: "".to_string(),
47            logging_config: None,
48            apm_config: ApmConfig::default(),
49            enable_tracing: false,
50            num_client_send_threads: 4,
51        }
52    }
53}
54
55#[derive(Clone, Debug)]
56pub struct LoggingConfig {
57    pub level: LogLevel,
58    pub time_format: String,
59    pub mod_filter: Vec<&'static str>,
60    pub body_filter: Vec<&'static str>,
61}
62
63impl Default for LoggingConfig {
64    fn default() -> Self {
65        LoggingConfig {
66            level: LogLevel::Info,
67            time_format: "%Y-%m-%d %H:%M:%S%z".to_string(),
68            mod_filter: Vec::new(),
69            body_filter: Vec::new(),
70        }
71    }
72}
73
74#[derive(Clone, Debug)]
75pub struct ApmConfig {
76    pub apm_enabled: bool,
77    pub sample_priority: f64,
78    pub sample_rate: f64,
79}
80
81impl Default for ApmConfig {
82    fn default() -> Self {
83        ApmConfig {
84            apm_enabled: false,
85            sample_rate: 0f64,
86            sample_priority: 0f64,
87        }
88    }
89}
90
91type TimeInNanos = u64;
92type ThreadId = u32;
93type TraceId = u64;
94type SpanId = u64;
95
96#[derive(Clone, Debug)]
97struct LogRecord {
98    pub thread_id: ThreadId,
99    pub level: log::Level,
100    pub time: DateTime<Utc>,
101    pub msg_str: String,
102    pub module: Option<String>,
103    #[cfg(feature = "json")]
104    pub key_values: HashMap<String, String>,
105}
106
107#[derive(Clone, Debug)]
108enum TraceCommand {
109    Log(LogRecord),
110    NewSpan(TimeInNanos, NewSpanData),
111    Enter(TimeInNanos, ThreadId, SpanId),
112    Exit(TimeInNanos, SpanId),
113    CloseSpan(TimeInNanos, SpanId),
114    Event(
115        TimeInNanos,
116        ThreadId,
117        HashMap<String, String>,
118        DateTime<Utc>,
119    ),
120}
121
122#[derive(Debug, Clone)]
123struct NewSpanData {
124    pub trace_id: TraceId,
125    pub id: SpanId,
126    pub name: String,
127    pub resource: String,
128    pub start: DateTime<Utc>,
129}
130
131#[derive(Clone, Debug)]
132struct SpanCollection {
133    completed_spans: Vec<Span>,
134    parent_span: Span,
135    current_spans: VecDeque<Span>,
136    entered_spans: VecDeque<u64>,
137}
138
139impl SpanCollection {
140    fn new(parent_span: Span) -> Self {
141        SpanCollection {
142            completed_spans: vec![],
143            parent_span,
144            current_spans: VecDeque::new(),
145            entered_spans: VecDeque::new(),
146        }
147    }
148
149    // Open a span by inserting the span into the "current" span map by ID.
150    fn start_span(&mut self, span: Span) {
151        let parent_id = Some(self.current_span_id().unwrap_or(self.parent_span.id));
152        self.current_spans.push_back(Span { parent_id, ..span });
153    }
154
155    // Move span to "completed" based on ID.
156    fn end_span(&mut self, nanos: u64, span_id: SpanId) {
157        let pos = self.current_spans.iter().rposition(|i| i.id == span_id);
158        if let Some(i) = pos {
159            self.current_spans.remove(i).map(|span| {
160                self.completed_spans.push(Span {
161                    duration: Duration::nanoseconds(nanos as i64 - span.start.timestamp_nanos()),
162                    ..span
163                })
164            });
165        }
166    }
167
168    // Enter a span (mark it on stack)
169    fn enter_span(&mut self, span_id: SpanId) {
170        self.entered_spans.push_back(span_id);
171    }
172
173    // Exit a span (pop from stack)
174    fn exit_span(&mut self, span_id: SpanId) {
175        let pos = self.entered_spans.iter().rposition(|i| *i == span_id);
176        if let Some(i) = pos {
177            self.entered_spans.remove(i);
178        }
179    }
180
181    /// Get the id, if present, of the most current span for this trace
182    fn current_span_id(&self) -> Option<u64> {
183        self.entered_spans.back().map(|i| *i)
184    }
185
186    fn add_tag(&mut self, k: String, v: String) {
187        self.current_spans.back_mut().map(|span| {
188            span.tags.insert(k.clone(), v.clone());
189        });
190        self.parent_span.tags.insert(k, v);
191    }
192
193    fn drain_current(mut self) -> Self {
194        std::mem::take(&mut self.current_spans)
195            .into_iter()
196            .for_each(|span| {
197                self.completed_spans.push(Span {
198                    duration: Utc::now().signed_duration_since(span.start),
199                    ..span
200                })
201            });
202        self
203    }
204
205    fn drain(self, end_time: DateTime<Utc>) -> Vec<Span> {
206        let parent_span = Span {
207            duration: end_time.signed_duration_since(self.parent_span.start.clone()),
208            ..self.parent_span.clone()
209        };
210        let mut ret = self.drain_current().completed_spans;
211        ret.push(parent_span);
212        ret
213    }
214}
215
216struct SpanStorage {
217    traces: HashMap<TraceId, SpanCollection>,
218    spans_to_trace_id: HashMap<SpanId, TraceId>,
219    current_trace_for_thread: HashMap<ThreadId, TraceId>,
220    current_thread_for_trace: HashMap<TraceId, ThreadId>,
221}
222
223impl SpanStorage {
224    fn new() -> Self {
225        SpanStorage {
226            traces: HashMap::new(),
227            spans_to_trace_id: HashMap::new(),
228            current_trace_for_thread: HashMap::new(),
229            current_thread_for_trace: HashMap::new(),
230        }
231    }
232
233    // Either start a new trace with the span's trace ID (if there is no span already
234    // pushed for that trace ID), or push the span on the "current" stack of spans for that
235    // trace ID.  If "parent" is true, that means we need a parent span pushed for this to
236    // represent the entire trace.
237    fn start_span(&mut self, span: Span) {
238        let trace_id = span.trace_id;
239        self.spans_to_trace_id.insert(span.id, span.trace_id);
240        if let Some(ss) = self.traces.get_mut(&trace_id) {
241            ss.start_span(span);
242        } else {
243            self.traces.insert(trace_id, SpanCollection::new(span));
244        }
245    }
246
247    /// End a span and update the current "top of the stack"
248    fn end_span(&mut self, nanos: u64, span_id: SpanId) {
249        if let Some(trace_id) = self.spans_to_trace_id.remove(&span_id) {
250            if let Some(ref mut ss) = self.traces.get_mut(&trace_id) {
251                ss.end_span(nanos, span_id);
252            }
253        }
254    }
255
256    /// Enter a span for trace, and keep track so that new spans get the correct parent.
257    /// Keep track of which trace the current thread is in (for logging and events)
258    fn enter_span(&mut self, thread_id: ThreadId, span_id: SpanId) {
259        let t_id = self.spans_to_trace_id.get(&span_id).map(|i| *i);
260        if let Some(trace_id) = t_id {
261            if let Some(ref mut ss) = self.traces.get_mut(&trace_id) {
262                ss.enter_span(span_id);
263                if ss.entered_spans.len() == 1 {
264                    self.set_current_trace(thread_id, trace_id);
265                }
266            }
267        }
268    }
269
270    /// Exit a span for trace, and keep track so that new spans get the correct parent
271    fn exit_span(&mut self, span_id: SpanId) {
272        let trace_id = self.spans_to_trace_id.get(&span_id).cloned();
273        if let Some(trace_id) = trace_id {
274            if let Some(ref mut ss) = self.traces.get_mut(&trace_id) {
275                ss.exit_span(span_id);
276                if ss.entered_spans.is_empty() {
277                    self.remove_current_trace(trace_id);
278                }
279            }
280        }
281    }
282
283    /// Drain the span collection for this trace so we can send the trace through to Datadog,
284    /// This effectively ends the trace.  Any new spans on this trace ID will have the same
285    /// trace ID, but have a new parent span (and a new trace line in Datadog).
286    fn drain_completed(&mut self, trace_id: TraceId, end: DateTime<Utc>) -> Vec<Span> {
287        if let Some(ss) = self.traces.remove(&trace_id) {
288            ss.drain(end)
289        } else {
290            vec![]
291        }
292    }
293
294    /// Record tag info onto a span
295    fn span_record_tag(&mut self, trace_id: TraceId, key: String, value: String) {
296        if let Some(ref mut ss) = self.traces.get_mut(&trace_id) {
297            ss.add_tag(key, value)
298        }
299    }
300
301    fn get_trace_id_for_thread(&self, thread_id: ThreadId) -> Option<u64> {
302        self.current_trace_for_thread.get(&thread_id).map(|i| *i)
303    }
304
305    fn set_current_trace(&mut self, thread_id: ThreadId, trace_id: TraceId) {
306        self.current_trace_for_thread.insert(thread_id, trace_id);
307        self.current_thread_for_trace.insert(trace_id, thread_id);
308    }
309
310    fn remove_current_trace(&mut self, trace_id: TraceId) {
311        let thread_id = self.current_thread_for_trace.remove(&trace_id);
312        if let Some(thr) = thread_id {
313            self.current_trace_for_thread.remove(&thr);
314        }
315    }
316
317    /// Get the id, if present, of the most current span for the given trace
318    fn current_span_id(&self, trace_id: TraceId) -> Option<SpanId> {
319        self.traces.get(&trace_id).and_then(|s| s.current_span_id())
320    }
321}
322
323fn trace_server_loop(
324    client: DdAgentClient,
325    buffer_receiver: mpsc::Receiver<TraceCommand>,
326    log_config: Option<LoggingConfig>,
327) {
328    let mut storage = SpanStorage::new();
329
330    loop {
331        match buffer_receiver.recv() {
332            Ok(TraceCommand::Log(record)) => {
333                if let Some(ref lc) = log_config {
334                    let skip = record
335                        .module
336                        .as_ref()
337                        .map(|m: &String| lc.mod_filter.iter().any(|filter| m.contains(*filter)))
338                        .unwrap_or(false);
339                    let body_skip = lc
340                        .body_filter
341                        .iter()
342                        .filter(|f| record.msg_str.contains(*f))
343                        .next()
344                        .is_some();
345                    if !skip && !body_skip {
346                        match storage
347                            .get_trace_id_for_thread(record.thread_id)
348                            .and_then(|tr_id| {
349                                storage.current_span_id(tr_id).map(|sp_id| (tr_id, sp_id))
350                            }) {
351                            Some((tr, sp)) => {
352                                // Both trace and span are active on this thread
353                                let log_body = build_log_body(&record);
354                                println!(
355                                    "{time} {level} [trace-id:{traceid} span-id:{spanid}] [{module}] {body}",
356                                    time = record.time.format(lc.time_format.as_ref()),
357                                    traceid = tr,
358                                    spanid = sp,
359                                    level = record.level,
360                                    module = record.module.unwrap_or("-".to_string()),
361                                    body = log_body
362                                );
363                            }
364                            _ => {
365                                let log_body = build_log_body(&record);
366                                // Both trace and span are not active on this thread
367                                println!(
368                                    "{time} {level} [{module}] {body}",
369                                    time = record.time.format(lc.time_format.as_ref()),
370                                    level = record.level,
371                                    module = record.module.unwrap_or("-".to_string()),
372                                    body = log_body
373                                );
374                            }
375                        }
376                    }
377                }
378            }
379            Ok(TraceCommand::NewSpan(_nanos, data)) => {
380                storage.start_span(Span {
381                    id: data.id,
382                    trace_id: data.trace_id,
383                    tags: HashMap::new(),
384                    parent_id: None,
385                    start: data.start,
386                    name: data.name,
387                    resource: data.resource,
388                    sql: None,
389                    duration: Duration::seconds(0),
390                });
391            }
392            Ok(TraceCommand::Enter(_nanos, thread_id, span_id)) => {
393                storage.enter_span(thread_id, span_id);
394            }
395            Ok(TraceCommand::Exit(_nanos, span_id)) => {
396                storage.exit_span(span_id);
397            }
398            Ok(TraceCommand::Event(_nanos, thread_id, mut event, time)) => {
399                // Events are only valid if the trace_id flag is set
400                // Send trace specified the trace to send, so use that instead of the thread's
401                // current trace.
402                if let Some(send_trace_id) = event
403                    .remove("send_trace")
404                    .and_then(|t| t.parse::<u64>().ok())
405                {
406                    let send_vec = storage.drain_completed(send_trace_id, time);
407                    // Thread has ended this trace.  Until it enters a new span, it
408                    // is not in a trace.
409                    storage.remove_current_trace(send_trace_id);
410                    if !send_vec.is_empty() {
411                        client.send(send_vec);
412                    }
413                }
414                // Tag events only work inside a trace, so get the trace from the thread.
415                // No trace means no tagging.
416                let trace_id_opt = storage.get_trace_id_for_thread(thread_id);
417                if let Some(trace_id) = trace_id_opt {
418                    if let Some(type_event) = event.remove("error.etype") {
419                        storage.span_record_tag(trace_id, "error.type".to_string(), type_event)
420                    }
421                    event
422                        .into_iter()
423                        .for_each(|(k, v)| storage.span_record_tag(trace_id, k, v));
424                }
425            }
426            Ok(TraceCommand::CloseSpan(nanos, span_id)) => {
427                storage.end_span(nanos, span_id);
428            }
429            Err(_) => {
430                return;
431            }
432        }
433    }
434}
435
436fn build_log_body(record: &LogRecord) -> String {
437    #[cfg(not(feature = "json"))]
438    {
439        record.msg_str.clone()
440    }
441    #[cfg(feature = "json")]
442    {
443        if record.key_values.is_empty() {
444            record.msg_str.clone()
445        } else {
446            let mut body = HashMap::new();
447            body.insert("message".to_string(), record.msg_str.clone());
448            for (k, v) in &record.key_values {
449                body.insert(k.clone(), v.clone());
450            }
451            serde_json::to_string(&body).unwrap_or_else(|_| "".to_string())
452        }
453    }
454}
455
456#[derive(Debug, Clone)]
457pub struct DatadogTracing {
458    buffer_sender: mpsc::Sender<TraceCommand>,
459    log_config: Option<LoggingConfig>,
460}
461
462unsafe impl Sync for DatadogTracing {}
463
464impl DatadogTracing {
465    pub fn new(config: Config) -> DatadogTracing {
466        let (buffer_sender, buffer_receiver) = mpsc::channel();
467        let sample_rate = config.apm_config.sample_rate;
468        let client = DdAgentClient::new(&config);
469
470        let log_config = config.logging_config.clone();
471        std::thread::spawn(move || trace_server_loop(client, buffer_receiver, log_config));
472
473        let tracer = DatadogTracing {
474            buffer_sender,
475            log_config: config.logging_config,
476        };
477
478        if let Some(ref lc) = tracer.log_config {
479            let _ = log::set_boxed_logger(Box::new(tracer.clone()));
480            log::set_max_level(lc.level.to_level_filter());
481        }
482        if config.enable_tracing {
483            // Only set the global sample rate once when the tracer is set as the global tracer.
484            // This must be marked unsafe because we are overwriting a global, but it only gets done
485            // once in a process's lifetime.
486            unsafe {
487                if SAMPLING_RATE.is_none() {
488                    SAMPLING_RATE = Some(sample_rate);
489                }
490            }
491
492            tracing::subscriber::set_global_default(tracer.clone()).unwrap_or_else(|_| {
493                warn!(
494                    "Global subscriber has already been set!  \
495                           This should only be set once in the executable."
496                )
497            });
498        }
499        tracer
500    }
501
502    pub fn get_global_sampling_rate() -> f64 {
503        unsafe { SAMPLING_RATE.clone().unwrap_or(0f64) }
504    }
505
506    fn send_log(&self, record: LogRecord) -> Result<(), ()> {
507        self.buffer_sender
508            .send(TraceCommand::Log(record))
509            .map(|_| ())
510            .map_err(|_| ())
511    }
512
513    fn send_new_span(&self, nanos: u64, span: NewSpanData) -> Result<(), ()> {
514        self.buffer_sender
515            .send(TraceCommand::NewSpan(nanos, span))
516            .map(|_| ())
517            .map_err(|_| ())
518    }
519
520    fn send_enter_span(&self, nanos: u64, thread_id: ThreadId, id: SpanId) -> Result<(), ()> {
521        self.buffer_sender
522            .send(TraceCommand::Enter(nanos, thread_id, id))
523            .map(|_| ())
524            .map_err(|_| ())
525    }
526
527    fn send_exit_span(&self, nanos: u64, id: SpanId) -> Result<(), ()> {
528        self.buffer_sender
529            .send(TraceCommand::Exit(nanos, id))
530            .map(|_| ())
531            .map_err(|_| ())
532    }
533
534    fn send_close_span(&self, nanos: u64, span_id: SpanId) -> Result<(), ()> {
535        self.buffer_sender
536            .send(TraceCommand::CloseSpan(nanos, span_id))
537            .map(|_| ())
538            .map_err(|_| ())
539    }
540
541    fn send_event(
542        &self,
543        nanos: u64,
544        thread_id: ThreadId,
545        event: HashMap<String, String>,
546        time: DateTime<Utc>,
547    ) -> Result<(), ()> {
548        self.buffer_sender
549            .send(TraceCommand::Event(nanos, thread_id, event, time))
550            .map(|_| ())
551            .map_err(|_| ())
552    }
553}
554
555fn log_level_to_trace_level(level: log::Level) -> tracing::Level {
556    use log::Level::*;
557    match level {
558        Error => tracing::Level::INFO,
559        Warn => tracing::Level::INFO,
560        Info => tracing::Level::INFO,
561        Debug => tracing::Level::DEBUG,
562        Trace => tracing::Level::TRACE,
563    }
564}
565
566static UNIQUEID_COUNTER: AtomicU16 = AtomicU16::new(0);
567static THREAD_COUNTER: AtomicU32 = AtomicU32::new(0);
568
569static mut SAMPLING_RATE: Option<f64> = None;
570
571thread_local! {
572    static THREAD_ID: ThreadId = THREAD_COUNTER.fetch_add(1, Ordering::Relaxed);
573    static CURRENT_SPAN_ID: Cell<Option<SpanId>> = Cell::new(None);
574}
575
576pub fn get_thread_id() -> ThreadId {
577    THREAD_ID.with(|id| *id)
578}
579
580pub fn get_current_span_id() -> Option<SpanId> {
581    CURRENT_SPAN_ID.with(|id| id.get())
582}
583
584pub fn set_current_span_id(new_id: Option<SpanId>) {
585    CURRENT_SPAN_ID.with(|id| {
586        id.set(new_id);
587    })
588}
589
590// Format
591// |                       6 bytes                       |      2 bytes    |
592// +--------+--------+--------+--------+--------+--------+--------+--------+
593// |     number of milliseconds since epoch (1970)       | static counter  |
594// +--------+--------+--------+--------+--------+--------+--------+--------+
595// 0        8        16       24       32       40       48       56       64
596//
597// This will hold up to the year 10,000 before it cycles.
598pub fn create_unique_id64() -> u64 {
599    let now = Utc::now();
600    let baseline = Utc.timestamp(0, 0);
601
602    let millis_since_epoch: u64 =
603        (now.signed_duration_since(baseline).num_milliseconds() << 16) as u64;
604    millis_since_epoch + UNIQUEID_COUNTER.fetch_add(1, Ordering::Relaxed) as u64
605}
606
607pub struct HashMapVisitor {
608    fields: HashMap<String, String>,
609}
610
611impl HashMapVisitor {
612    fn new() -> Self {
613        // Event/Span vectors should never have more than ten fields.
614        HashMapVisitor {
615            fields: HashMap::new(),
616        }
617    }
618    fn add_value(&mut self, field: &tracing::field::Field, value: String) {
619        self.fields.insert(field.name().to_string(), value);
620    }
621}
622
623impl tracing::field::Visit for HashMapVisitor {
624    fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
625        self.add_value(field, format!("{}", value));
626    }
627    fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
628        self.add_value(field, format!("{}", value));
629    }
630    fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
631        self.add_value(field, format!("{}", value));
632    }
633    fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
634        self.add_value(field, format!("{}", value));
635    }
636    fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {
637        // Do nothing
638    }
639}
640
641impl tracing::Subscriber for DatadogTracing {
642    fn enabled(&self, metadata: &tracing::Metadata<'_>) -> bool {
643        match self.log_config {
644            Some(ref lc) => log_level_to_trace_level(lc.level) >= *metadata.level(),
645            None => false,
646        }
647    }
648
649    fn new_span(&self, span: &tracing::span::Attributes<'_>) -> tracing::span::Id {
650        let nanos = Utc::now().timestamp_nanos() as u64;
651        let mut new_span_visitor = HashMapVisitor::new();
652        span.record(&mut new_span_visitor);
653        let trace_id = new_span_visitor
654            .fields
655            .remove("trace_id")
656            .and_then(|s| s.parse::<u64>().ok())
657            .unwrap_or(Utc::now().timestamp_nanos() as u64);
658        let span_id = Utc::now().timestamp_nanos() as u64 + 1;
659        let new_span = NewSpanData {
660            id: span_id,
661            trace_id,
662            start: Utc::now(),
663            resource: span.metadata().target().to_string(),
664            name: span.metadata().name().to_string(),
665        };
666        self.send_new_span(nanos, new_span).unwrap_or(());
667        tracing::span::Id::from_u64(span_id)
668    }
669
670    fn record(&self, _span: &tracing::span::Id, _values: &tracing::span::Record<'_>) {}
671
672    fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
673
674    fn event(&self, event: &tracing::Event<'_>) {
675        let nanos = Utc::now().timestamp_nanos() as u64;
676        let thread_id = get_thread_id();
677        let mut new_evt_visitor = HashMapVisitor::new();
678        event.record(&mut new_evt_visitor);
679
680        self.send_event(nanos, thread_id, new_evt_visitor.fields, Utc::now())
681            .unwrap_or(());
682    }
683
684    fn enter(&self, span: &tracing::span::Id) {
685        let nanos = Utc::now().timestamp_nanos() as u64;
686        let thread_id = get_thread_id();
687        self.send_enter_span(nanos, thread_id, span.clone().into_u64())
688            .unwrap_or(());
689        set_current_span_id(Some(span.into_u64()));
690    }
691
692    fn exit(&self, span: &tracing::span::Id) {
693        let nanos = Utc::now().timestamp_nanos() as u64;
694        self.send_exit_span(nanos, span.clone().into_u64())
695            .unwrap_or(());
696        set_current_span_id(None);
697    }
698
699    fn try_close(&self, span: tracing::span::Id) -> bool {
700        let nanos = Utc::now().timestamp_nanos() as u64;
701        self.send_close_span(nanos, span.into_u64()).unwrap_or(());
702        false
703    }
704}
705
706#[cfg(feature = "json")]
707struct KeyValueMap(HashMap<String, String>);
708
709#[cfg(feature = "json")]
710impl<'kvs> kv::VisitSource<'kvs> for KeyValueMap {
711    fn visit_pair(&mut self, key: kv::Key<'kvs>, value: kv::Value<'kvs>) -> Result<(), kv::Error> {
712        self.0.insert(key.to_string(), value.to_string());
713        Ok(())
714    }
715}
716
717#[cfg(feature = "json")]
718fn build_key_value_map<'a>(record: &Record<'a>) -> HashMap<String, String> {
719    let mut visitor = KeyValueMap(HashMap::new());
720    let visit_result = record.key_values().visit(&mut visitor);
721    if let Err(e) = visit_result {
722        println!("Error building key value map: {:?}", e);
723    }
724
725    visitor.0
726}
727
728impl Log for DatadogTracing {
729    fn enabled(&self, metadata: &log::Metadata) -> bool {
730        if let Some(ref lc) = self.log_config {
731            metadata.level() <= lc.level
732        } else {
733            false
734        }
735    }
736
737    fn log(&self, record: &Record) {
738        if let Some(ref lc) = self.log_config {
739            #[cfg(feature = "json")]
740            let key_values = build_key_value_map(record);
741            if record.level() <= lc.level {
742                let thread_id = get_thread_id();
743                let now = chrono::Utc::now();
744                let msg_str = format!("{}", record.args());
745                let log_rec = LogRecord {
746                    thread_id,
747                    level: record.level(),
748                    time: now,
749                    module: record.module_path().map(|s| s.to_string()),
750                    msg_str,
751                    #[cfg(feature = "json")]
752                    key_values,
753                };
754                self.send_log(log_rec).unwrap_or_else(|_| ());
755            }
756        }
757    }
758
759    fn flush(&self) {}
760}
761
762#[derive(Debug, Clone)]
763struct DdAgentClient {
764    client_sender: crossbeam_channel::Sender<Vec<Span>>,
765}
766
767impl DdAgentClient {
768    fn new(config: &Config) -> Self {
769        let (client_sender, client_requests) = crossbeam_channel::unbounded();
770
771        for _ in 0..config.num_client_send_threads {
772            let env = config.env.clone();
773            let service = config.service.clone();
774            let host = config.host.clone();
775            let port = config.port.clone();
776            let apm_config = config.apm_config.clone();
777            let cr_channel = client_requests.clone();
778            std::thread::spawn(move || {
779                DdAgentClient::thread_loop(
780                    cr_channel,
781                    env,
782                    format!("http://{}:{}/v0.3/traces", host, port),
783                    service,
784                    apm_config,
785                )
786            });
787        }
788        DdAgentClient { client_sender }
789    }
790
791    fn send(&self, stack: Vec<Span>) {
792        self.client_sender.send(stack).unwrap_or_else(|_| {
793            println!("Tracing send error: Channel closed!");
794        });
795    }
796
797    fn thread_loop(
798        client_requests: crossbeam_channel::Receiver<Vec<Span>>,
799        env: Option<String>,
800        endpoint: String,
801        service: String,
802        apm_config: ApmConfig,
803    ) {
804        // Loop as long as the channel is open
805        while let Ok(stack) = client_requests.recv() {
806            let count = stack.len();
807            let spans: Vec<Vec<RawSpan>> = vec![stack
808                .into_iter()
809                .map(|s| RawSpan::from_span(&s, &service, &env, &apm_config))
810                .collect()];
811            match to_string(&spans) {
812                Err(e) => println!("Couldn't encode payload for datadog: {:?}", e),
813                Ok(payload) => {
814                    let req = attohttpc::post(&endpoint)
815                        .header("Content-Length", payload.len() as u64)
816                        .header("Content-Type", "application/json")
817                        .header("X-Datadog-Trace-Count", count)
818                        .text(&payload);
819
820                    match req.send() {
821                        Ok(resp) if !resp.is_success() => {
822                            println!("error from datadog agent: {:?}", resp)
823                        }
824                        Err(err) => println!("error sending traces to datadog: {:?}", err),
825                        _ => {}
826                    }
827                }
828            }
829        }
830    }
831}
832
833#[cfg(test)]
834mod tests {
835    use super::*;
836    use log::{debug, info, Level};
837    use tracing::{event, span};
838
839    fn long_call(trace_id: u64) {
840        let span = span!(tracing::Level::INFO, "long_call", trace_id = trace_id);
841        let _e = span.enter();
842        debug!("Waiting on I/O {}", trace_id);
843        sleep_call(trace_id);
844        info!("I/O Finished {}", trace_id);
845    }
846
847    fn sleep_call(trace_id: u64) {
848        let span = span!(tracing::Level::INFO, "sleep_call", trace_id = trace_id);
849        let _e = span.enter();
850        debug!("Long call {}", trace_id);
851        debug!(
852            "Current thread ID/span ID: {}/{:?}",
853            get_thread_id(),
854            get_current_span_id()
855        );
856        std::thread::sleep(std::time::Duration::from_millis(2000));
857    }
858
859    fn traced_func_no_send(trace_id: u64) {
860        let span = span!(
861            tracing::Level::INFO,
862            "traced_func_no_send",
863            trace_id = trace_id
864        );
865        let _e = span.enter();
866        debug!(
867            "Performing some function for id={}/{:?}",
868            trace_id,
869            get_current_span_id()
870        );
871        long_call(trace_id);
872    }
873
874    fn traced_http_func(trace_id: u64) {
875        let span = span!(
876            tracing::Level::INFO,
877            "traced_http_func",
878            trace_id = trace_id
879        );
880        let _e = span.enter();
881        debug!(
882            "Performing some function for id={}/{:?}",
883            trace_id,
884            get_current_span_id()
885        );
886        long_call(trace_id);
887        event!(
888            tracing::Level::INFO,
889            http.url = "http://test.test/",
890            http.status_code = "200",
891            http.method = "GET"
892        );
893        event!(tracing::Level::INFO, send_trace = trace_id);
894    }
895
896    fn traced_error_func(trace_id: u64) {
897        let span = span!(
898            tracing::Level::INFO,
899            "traced_error_func",
900            trace_id = trace_id
901        );
902        let _e = span.enter();
903        debug!(
904            "Performing some function for id={}/{:?}",
905            trace_id,
906            get_current_span_id()
907        );
908        long_call(trace_id);
909        event!(
910            tracing::Level::ERROR,
911            error.etype = "",
912            error.message = "Test error"
913        );
914        event!(
915            tracing::Level::ERROR,
916            http.url = "http://test.test/",
917            http.status_code = "400",
918            http.method = "GET"
919        );
920        event!(
921            tracing::Level::ERROR,
922            custom_tag = "good",
923            custom_tag2 = "test",
924            send_trace = trace_id
925        );
926    }
927
928    fn traced_error_func_single_event(trace_id: u64) {
929        let span = span!(
930            tracing::Level::INFO,
931            "traced_error_func_single_event",
932            trace_id = trace_id
933        );
934        let _e = span.enter();
935
936        debug!(
937            "Performing some function for id={}/{:?}",
938            trace_id,
939            get_current_span_id()
940        );
941        long_call(trace_id);
942        event!(
943            tracing::Level::ERROR,
944            send_trace = trace_id,
945            error.etype = "",
946            error.message = "Test error",
947            http.url = "http://test.test/",
948            http.status_code = "400",
949            http.method = "GET",
950            custom_tag = "good",
951            custom_tag2 = "test"
952        );
953    }
954
955    fn trace_config() {
956        let config = Config {
957            service: String::from("datadog_apm_test"),
958            env: Some("staging-01".into()),
959            logging_config: Some(LoggingConfig {
960                level: Level::Trace,
961                mod_filter: vec!["hyper", "mime"],
962                ..LoggingConfig::default()
963            }),
964            enable_tracing: true,
965            ..Default::default()
966        };
967        let _client = DatadogTracing::new(config);
968    }
969
970    #[test]
971    fn test_exit_child_span() {
972        trace_config();
973        let trace_id = 1u64;
974
975        let f1 = std::thread::spawn(move || {
976            let span = span!(tracing::Level::INFO, "parent_span", trace_id = trace_id);
977            let _e = span.enter();
978            info!("Inside parent_span, should print trace and span ID");
979            {
980                let span = span!(tracing::Level::INFO, "child_span", trace_id = trace_id);
981                let _e = span.enter();
982                info!("Inside child_span, should print trace and span ID");
983            }
984            info!("Back in parent_span, should print trace and span ID");
985        });
986        f1.join().unwrap();
987        event!(tracing::Level::INFO, send_trace = trace_id);
988        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
989    }
990
991    #[test]
992    fn test_trace_one_func_stack() {
993        let trace_id = create_unique_id64();
994        trace_config();
995
996        debug!(
997            "Outside of span, this should be None: {:?}",
998            get_current_span_id()
999        );
1000        debug!(
1001            "Sampling rate is {}",
1002            DatadogTracing::get_global_sampling_rate()
1003        );
1004
1005        let f1 = std::thread::spawn(move || {
1006            traced_func_no_send(trace_id);
1007            event!(tracing::Level::INFO, send_trace = trace_id);
1008        });
1009
1010        debug!(
1011            "Same as before span, after span completes, this should be None: {:?}",
1012            get_current_span_id()
1013        );
1014        f1.join().unwrap();
1015        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1016    }
1017
1018    #[test]
1019    fn test_parallel_two_threads_two_traces() {
1020        let trace_id1 = create_unique_id64();
1021        let trace_id2 = create_unique_id64();
1022        trace_config();
1023        let f1 = std::thread::spawn(move || {
1024            traced_func_no_send(trace_id1);
1025            event!(tracing::Level::INFO, send_trace = trace_id1);
1026        });
1027        let f2 = std::thread::spawn(move || {
1028            traced_func_no_send(trace_id2);
1029            event!(tracing::Level::INFO, send_trace = trace_id2);
1030        });
1031
1032        f1.join().unwrap();
1033        f2.join().unwrap();
1034        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1035    }
1036
1037    #[test]
1038    fn test_parallel_two_threads_ten_traces() {
1039        let trace_id1 = create_unique_id64();
1040        let trace_id2 = create_unique_id64() + 1;
1041        let trace_id3 = create_unique_id64() + 2;
1042        let trace_id4 = create_unique_id64() + 3;
1043        let trace_id5 = create_unique_id64() + 4;
1044        let trace_id6 = create_unique_id64() + 5;
1045        let trace_id7 = create_unique_id64() + 6;
1046        let trace_id8 = create_unique_id64() + 7;
1047        let trace_id9 = create_unique_id64() + 8;
1048        let trace_id10 = create_unique_id64() + 9;
1049        trace_config();
1050        let f1 = std::thread::spawn(move || {
1051            traced_func_no_send(trace_id1);
1052            event!(tracing::Level::INFO, send_trace = trace_id1);
1053        });
1054        let f2 = std::thread::spawn(move || {
1055            traced_func_no_send(trace_id2);
1056            event!(tracing::Level::INFO, send_trace = trace_id2);
1057        });
1058        let f3 = std::thread::spawn(move || {
1059            traced_func_no_send(trace_id3);
1060            event!(tracing::Level::INFO, send_trace = trace_id3);
1061        });
1062        let f4 = std::thread::spawn(move || {
1063            traced_func_no_send(trace_id4);
1064            event!(tracing::Level::INFO, send_trace = trace_id4);
1065        });
1066        let f5 = std::thread::spawn(move || {
1067            traced_func_no_send(trace_id5);
1068            event!(tracing::Level::INFO, send_trace = trace_id5);
1069        });
1070        let f6 = std::thread::spawn(move || {
1071            traced_func_no_send(trace_id6);
1072            event!(tracing::Level::INFO, send_trace = trace_id6);
1073        });
1074        let f7 = std::thread::spawn(move || {
1075            traced_func_no_send(trace_id7);
1076            event!(tracing::Level::INFO, send_trace = trace_id7);
1077        });
1078        let f8 = std::thread::spawn(move || {
1079            traced_func_no_send(trace_id8);
1080            event!(tracing::Level::INFO, send_trace = trace_id8);
1081        });
1082        let f9 = std::thread::spawn(move || {
1083            traced_func_no_send(trace_id9);
1084            event!(tracing::Level::INFO, send_trace = trace_id9);
1085        });
1086        let f10 = std::thread::spawn(move || {
1087            traced_func_no_send(trace_id10);
1088            event!(tracing::Level::INFO, send_trace = trace_id10);
1089        });
1090        f1.join().unwrap();
1091        f2.join().unwrap();
1092        f3.join().unwrap();
1093        f4.join().unwrap();
1094        f5.join().unwrap();
1095        f6.join().unwrap();
1096        f7.join().unwrap();
1097        f8.join().unwrap();
1098        f9.join().unwrap();
1099        f10.join().unwrap();
1100
1101        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1102    }
1103
1104    #[test]
1105    fn test_error_span() {
1106        let trace_id = create_unique_id64();
1107        trace_config();
1108        let f3 = std::thread::spawn(move || {
1109            traced_error_func(trace_id);
1110        });
1111        f3.join().unwrap();
1112        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1113    }
1114
1115    #[test]
1116    fn test_error_span_as_single_event() {
1117        let trace_id = create_unique_id64();
1118        trace_config();
1119        let f4 = std::thread::spawn(move || {
1120            traced_error_func_single_event(trace_id);
1121        });
1122        f4.join().unwrap();
1123        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1124    }
1125
1126    #[test]
1127    fn test_two_funcs_in_one_span() {
1128        let trace_id = create_unique_id64();
1129        trace_config();
1130        let f5 = std::thread::spawn(move || {
1131            traced_func_no_send(trace_id);
1132            traced_func_no_send(trace_id);
1133            // Send both funcs under one parent span and one trace
1134            event!(tracing::Level::INFO, send_trace = trace_id);
1135        });
1136        f5.join().unwrap();
1137        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1138    }
1139
1140    #[test]
1141    fn test_one_thread_two_funcs_serial_two_traces() {
1142        let trace_id1 = create_unique_id64();
1143        let trace_id2 = create_unique_id64();
1144        trace_config();
1145        let f7 = std::thread::spawn(move || {
1146            traced_func_no_send(trace_id1);
1147            event!(tracing::Level::INFO, send_trace = trace_id1);
1148
1149            traced_func_no_send(trace_id2);
1150            event!(tracing::Level::INFO, send_trace = trace_id2);
1151        });
1152        f7.join().unwrap();
1153        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1154    }
1155
1156    #[test]
1157    fn test_http_span() {
1158        let trace_id = create_unique_id64();
1159        trace_config();
1160        let f3 = std::thread::spawn(move || {
1161            traced_http_func(trace_id);
1162        });
1163        f3.join().unwrap();
1164        ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1165    }
1166}