1use crate::{api::RawSpan, model::Span};
2
3use attohttpc;
4use chrono::{DateTime, Duration, TimeZone, Utc};
5use log::{warn, Level as LogLevel, Log, Record};
6use serde_json::to_string;
7use std::{
8 cell::Cell,
9 collections::{HashMap, VecDeque},
10 sync::{
11 atomic::{AtomicU16, AtomicU32, Ordering},
12 mpsc,
13 },
14};
15
16#[cfg(feature = "json")]
17use log::kv;
18
19#[derive(Clone, Debug)]
21pub struct Config {
22 pub service: String,
24 pub env: Option<String>,
26 pub host: String,
28 pub port: String,
30 pub logging_config: Option<LoggingConfig>,
32 pub apm_config: ApmConfig,
34 pub enable_tracing: bool,
36 pub num_client_send_threads: u32,
38}
39
40impl Default for Config {
41 fn default() -> Self {
42 Config {
43 env: None,
44 host: "localhost".to_string(),
45 port: "8126".to_string(),
46 service: "".to_string(),
47 logging_config: None,
48 apm_config: ApmConfig::default(),
49 enable_tracing: false,
50 num_client_send_threads: 4,
51 }
52 }
53}
54
55#[derive(Clone, Debug)]
56pub struct LoggingConfig {
57 pub level: LogLevel,
58 pub time_format: String,
59 pub mod_filter: Vec<&'static str>,
60 pub body_filter: Vec<&'static str>,
61}
62
63impl Default for LoggingConfig {
64 fn default() -> Self {
65 LoggingConfig {
66 level: LogLevel::Info,
67 time_format: "%Y-%m-%d %H:%M:%S%z".to_string(),
68 mod_filter: Vec::new(),
69 body_filter: Vec::new(),
70 }
71 }
72}
73
74#[derive(Clone, Debug)]
75pub struct ApmConfig {
76 pub apm_enabled: bool,
77 pub sample_priority: f64,
78 pub sample_rate: f64,
79}
80
81impl Default for ApmConfig {
82 fn default() -> Self {
83 ApmConfig {
84 apm_enabled: false,
85 sample_rate: 0f64,
86 sample_priority: 0f64,
87 }
88 }
89}
90
91type TimeInNanos = u64;
92type ThreadId = u32;
93type TraceId = u64;
94type SpanId = u64;
95
96#[derive(Clone, Debug)]
97struct LogRecord {
98 pub thread_id: ThreadId,
99 pub level: log::Level,
100 pub time: DateTime<Utc>,
101 pub msg_str: String,
102 pub module: Option<String>,
103 #[cfg(feature = "json")]
104 pub key_values: HashMap<String, String>,
105}
106
107#[derive(Clone, Debug)]
108enum TraceCommand {
109 Log(LogRecord),
110 NewSpan(TimeInNanos, NewSpanData),
111 Enter(TimeInNanos, ThreadId, SpanId),
112 Exit(TimeInNanos, SpanId),
113 CloseSpan(TimeInNanos, SpanId),
114 Event(
115 TimeInNanos,
116 ThreadId,
117 HashMap<String, String>,
118 DateTime<Utc>,
119 ),
120}
121
122#[derive(Debug, Clone)]
123struct NewSpanData {
124 pub trace_id: TraceId,
125 pub id: SpanId,
126 pub name: String,
127 pub resource: String,
128 pub start: DateTime<Utc>,
129}
130
131#[derive(Clone, Debug)]
132struct SpanCollection {
133 completed_spans: Vec<Span>,
134 parent_span: Span,
135 current_spans: VecDeque<Span>,
136 entered_spans: VecDeque<u64>,
137}
138
139impl SpanCollection {
140 fn new(parent_span: Span) -> Self {
141 SpanCollection {
142 completed_spans: vec![],
143 parent_span,
144 current_spans: VecDeque::new(),
145 entered_spans: VecDeque::new(),
146 }
147 }
148
149 fn start_span(&mut self, span: Span) {
151 let parent_id = Some(self.current_span_id().unwrap_or(self.parent_span.id));
152 self.current_spans.push_back(Span { parent_id, ..span });
153 }
154
155 fn end_span(&mut self, nanos: u64, span_id: SpanId) {
157 let pos = self.current_spans.iter().rposition(|i| i.id == span_id);
158 if let Some(i) = pos {
159 self.current_spans.remove(i).map(|span| {
160 self.completed_spans.push(Span {
161 duration: Duration::nanoseconds(nanos as i64 - span.start.timestamp_nanos()),
162 ..span
163 })
164 });
165 }
166 }
167
168 fn enter_span(&mut self, span_id: SpanId) {
170 self.entered_spans.push_back(span_id);
171 }
172
173 fn exit_span(&mut self, span_id: SpanId) {
175 let pos = self.entered_spans.iter().rposition(|i| *i == span_id);
176 if let Some(i) = pos {
177 self.entered_spans.remove(i);
178 }
179 }
180
181 fn current_span_id(&self) -> Option<u64> {
183 self.entered_spans.back().map(|i| *i)
184 }
185
186 fn add_tag(&mut self, k: String, v: String) {
187 self.current_spans.back_mut().map(|span| {
188 span.tags.insert(k.clone(), v.clone());
189 });
190 self.parent_span.tags.insert(k, v);
191 }
192
193 fn drain_current(mut self) -> Self {
194 std::mem::take(&mut self.current_spans)
195 .into_iter()
196 .for_each(|span| {
197 self.completed_spans.push(Span {
198 duration: Utc::now().signed_duration_since(span.start),
199 ..span
200 })
201 });
202 self
203 }
204
205 fn drain(self, end_time: DateTime<Utc>) -> Vec<Span> {
206 let parent_span = Span {
207 duration: end_time.signed_duration_since(self.parent_span.start.clone()),
208 ..self.parent_span.clone()
209 };
210 let mut ret = self.drain_current().completed_spans;
211 ret.push(parent_span);
212 ret
213 }
214}
215
216struct SpanStorage {
217 traces: HashMap<TraceId, SpanCollection>,
218 spans_to_trace_id: HashMap<SpanId, TraceId>,
219 current_trace_for_thread: HashMap<ThreadId, TraceId>,
220 current_thread_for_trace: HashMap<TraceId, ThreadId>,
221}
222
223impl SpanStorage {
224 fn new() -> Self {
225 SpanStorage {
226 traces: HashMap::new(),
227 spans_to_trace_id: HashMap::new(),
228 current_trace_for_thread: HashMap::new(),
229 current_thread_for_trace: HashMap::new(),
230 }
231 }
232
233 fn start_span(&mut self, span: Span) {
238 let trace_id = span.trace_id;
239 self.spans_to_trace_id.insert(span.id, span.trace_id);
240 if let Some(ss) = self.traces.get_mut(&trace_id) {
241 ss.start_span(span);
242 } else {
243 self.traces.insert(trace_id, SpanCollection::new(span));
244 }
245 }
246
247 fn end_span(&mut self, nanos: u64, span_id: SpanId) {
249 if let Some(trace_id) = self.spans_to_trace_id.remove(&span_id) {
250 if let Some(ref mut ss) = self.traces.get_mut(&trace_id) {
251 ss.end_span(nanos, span_id);
252 }
253 }
254 }
255
256 fn enter_span(&mut self, thread_id: ThreadId, span_id: SpanId) {
259 let t_id = self.spans_to_trace_id.get(&span_id).map(|i| *i);
260 if let Some(trace_id) = t_id {
261 if let Some(ref mut ss) = self.traces.get_mut(&trace_id) {
262 ss.enter_span(span_id);
263 if ss.entered_spans.len() == 1 {
264 self.set_current_trace(thread_id, trace_id);
265 }
266 }
267 }
268 }
269
270 fn exit_span(&mut self, span_id: SpanId) {
272 let trace_id = self.spans_to_trace_id.get(&span_id).cloned();
273 if let Some(trace_id) = trace_id {
274 if let Some(ref mut ss) = self.traces.get_mut(&trace_id) {
275 ss.exit_span(span_id);
276 if ss.entered_spans.is_empty() {
277 self.remove_current_trace(trace_id);
278 }
279 }
280 }
281 }
282
283 fn drain_completed(&mut self, trace_id: TraceId, end: DateTime<Utc>) -> Vec<Span> {
287 if let Some(ss) = self.traces.remove(&trace_id) {
288 ss.drain(end)
289 } else {
290 vec![]
291 }
292 }
293
294 fn span_record_tag(&mut self, trace_id: TraceId, key: String, value: String) {
296 if let Some(ref mut ss) = self.traces.get_mut(&trace_id) {
297 ss.add_tag(key, value)
298 }
299 }
300
301 fn get_trace_id_for_thread(&self, thread_id: ThreadId) -> Option<u64> {
302 self.current_trace_for_thread.get(&thread_id).map(|i| *i)
303 }
304
305 fn set_current_trace(&mut self, thread_id: ThreadId, trace_id: TraceId) {
306 self.current_trace_for_thread.insert(thread_id, trace_id);
307 self.current_thread_for_trace.insert(trace_id, thread_id);
308 }
309
310 fn remove_current_trace(&mut self, trace_id: TraceId) {
311 let thread_id = self.current_thread_for_trace.remove(&trace_id);
312 if let Some(thr) = thread_id {
313 self.current_trace_for_thread.remove(&thr);
314 }
315 }
316
317 fn current_span_id(&self, trace_id: TraceId) -> Option<SpanId> {
319 self.traces.get(&trace_id).and_then(|s| s.current_span_id())
320 }
321}
322
323fn trace_server_loop(
324 client: DdAgentClient,
325 buffer_receiver: mpsc::Receiver<TraceCommand>,
326 log_config: Option<LoggingConfig>,
327) {
328 let mut storage = SpanStorage::new();
329
330 loop {
331 match buffer_receiver.recv() {
332 Ok(TraceCommand::Log(record)) => {
333 if let Some(ref lc) = log_config {
334 let skip = record
335 .module
336 .as_ref()
337 .map(|m: &String| lc.mod_filter.iter().any(|filter| m.contains(*filter)))
338 .unwrap_or(false);
339 let body_skip = lc
340 .body_filter
341 .iter()
342 .filter(|f| record.msg_str.contains(*f))
343 .next()
344 .is_some();
345 if !skip && !body_skip {
346 match storage
347 .get_trace_id_for_thread(record.thread_id)
348 .and_then(|tr_id| {
349 storage.current_span_id(tr_id).map(|sp_id| (tr_id, sp_id))
350 }) {
351 Some((tr, sp)) => {
352 let log_body = build_log_body(&record);
354 println!(
355 "{time} {level} [trace-id:{traceid} span-id:{spanid}] [{module}] {body}",
356 time = record.time.format(lc.time_format.as_ref()),
357 traceid = tr,
358 spanid = sp,
359 level = record.level,
360 module = record.module.unwrap_or("-".to_string()),
361 body = log_body
362 );
363 }
364 _ => {
365 let log_body = build_log_body(&record);
366 println!(
368 "{time} {level} [{module}] {body}",
369 time = record.time.format(lc.time_format.as_ref()),
370 level = record.level,
371 module = record.module.unwrap_or("-".to_string()),
372 body = log_body
373 );
374 }
375 }
376 }
377 }
378 }
379 Ok(TraceCommand::NewSpan(_nanos, data)) => {
380 storage.start_span(Span {
381 id: data.id,
382 trace_id: data.trace_id,
383 tags: HashMap::new(),
384 parent_id: None,
385 start: data.start,
386 name: data.name,
387 resource: data.resource,
388 sql: None,
389 duration: Duration::seconds(0),
390 });
391 }
392 Ok(TraceCommand::Enter(_nanos, thread_id, span_id)) => {
393 storage.enter_span(thread_id, span_id);
394 }
395 Ok(TraceCommand::Exit(_nanos, span_id)) => {
396 storage.exit_span(span_id);
397 }
398 Ok(TraceCommand::Event(_nanos, thread_id, mut event, time)) => {
399 if let Some(send_trace_id) = event
403 .remove("send_trace")
404 .and_then(|t| t.parse::<u64>().ok())
405 {
406 let send_vec = storage.drain_completed(send_trace_id, time);
407 storage.remove_current_trace(send_trace_id);
410 if !send_vec.is_empty() {
411 client.send(send_vec);
412 }
413 }
414 let trace_id_opt = storage.get_trace_id_for_thread(thread_id);
417 if let Some(trace_id) = trace_id_opt {
418 if let Some(type_event) = event.remove("error.etype") {
419 storage.span_record_tag(trace_id, "error.type".to_string(), type_event)
420 }
421 event
422 .into_iter()
423 .for_each(|(k, v)| storage.span_record_tag(trace_id, k, v));
424 }
425 }
426 Ok(TraceCommand::CloseSpan(nanos, span_id)) => {
427 storage.end_span(nanos, span_id);
428 }
429 Err(_) => {
430 return;
431 }
432 }
433 }
434}
435
436fn build_log_body(record: &LogRecord) -> String {
437 #[cfg(not(feature = "json"))]
438 {
439 record.msg_str.clone()
440 }
441 #[cfg(feature = "json")]
442 {
443 if record.key_values.is_empty() {
444 record.msg_str.clone()
445 } else {
446 let mut body = HashMap::new();
447 body.insert("message".to_string(), record.msg_str.clone());
448 for (k, v) in &record.key_values {
449 body.insert(k.clone(), v.clone());
450 }
451 serde_json::to_string(&body).unwrap_or_else(|_| "".to_string())
452 }
453 }
454}
455
456#[derive(Debug, Clone)]
457pub struct DatadogTracing {
458 buffer_sender: mpsc::Sender<TraceCommand>,
459 log_config: Option<LoggingConfig>,
460}
461
462unsafe impl Sync for DatadogTracing {}
463
464impl DatadogTracing {
465 pub fn new(config: Config) -> DatadogTracing {
466 let (buffer_sender, buffer_receiver) = mpsc::channel();
467 let sample_rate = config.apm_config.sample_rate;
468 let client = DdAgentClient::new(&config);
469
470 let log_config = config.logging_config.clone();
471 std::thread::spawn(move || trace_server_loop(client, buffer_receiver, log_config));
472
473 let tracer = DatadogTracing {
474 buffer_sender,
475 log_config: config.logging_config,
476 };
477
478 if let Some(ref lc) = tracer.log_config {
479 let _ = log::set_boxed_logger(Box::new(tracer.clone()));
480 log::set_max_level(lc.level.to_level_filter());
481 }
482 if config.enable_tracing {
483 unsafe {
487 if SAMPLING_RATE.is_none() {
488 SAMPLING_RATE = Some(sample_rate);
489 }
490 }
491
492 tracing::subscriber::set_global_default(tracer.clone()).unwrap_or_else(|_| {
493 warn!(
494 "Global subscriber has already been set! \
495 This should only be set once in the executable."
496 )
497 });
498 }
499 tracer
500 }
501
502 pub fn get_global_sampling_rate() -> f64 {
503 unsafe { SAMPLING_RATE.clone().unwrap_or(0f64) }
504 }
505
506 fn send_log(&self, record: LogRecord) -> Result<(), ()> {
507 self.buffer_sender
508 .send(TraceCommand::Log(record))
509 .map(|_| ())
510 .map_err(|_| ())
511 }
512
513 fn send_new_span(&self, nanos: u64, span: NewSpanData) -> Result<(), ()> {
514 self.buffer_sender
515 .send(TraceCommand::NewSpan(nanos, span))
516 .map(|_| ())
517 .map_err(|_| ())
518 }
519
520 fn send_enter_span(&self, nanos: u64, thread_id: ThreadId, id: SpanId) -> Result<(), ()> {
521 self.buffer_sender
522 .send(TraceCommand::Enter(nanos, thread_id, id))
523 .map(|_| ())
524 .map_err(|_| ())
525 }
526
527 fn send_exit_span(&self, nanos: u64, id: SpanId) -> Result<(), ()> {
528 self.buffer_sender
529 .send(TraceCommand::Exit(nanos, id))
530 .map(|_| ())
531 .map_err(|_| ())
532 }
533
534 fn send_close_span(&self, nanos: u64, span_id: SpanId) -> Result<(), ()> {
535 self.buffer_sender
536 .send(TraceCommand::CloseSpan(nanos, span_id))
537 .map(|_| ())
538 .map_err(|_| ())
539 }
540
541 fn send_event(
542 &self,
543 nanos: u64,
544 thread_id: ThreadId,
545 event: HashMap<String, String>,
546 time: DateTime<Utc>,
547 ) -> Result<(), ()> {
548 self.buffer_sender
549 .send(TraceCommand::Event(nanos, thread_id, event, time))
550 .map(|_| ())
551 .map_err(|_| ())
552 }
553}
554
555fn log_level_to_trace_level(level: log::Level) -> tracing::Level {
556 use log::Level::*;
557 match level {
558 Error => tracing::Level::INFO,
559 Warn => tracing::Level::INFO,
560 Info => tracing::Level::INFO,
561 Debug => tracing::Level::DEBUG,
562 Trace => tracing::Level::TRACE,
563 }
564}
565
566static UNIQUEID_COUNTER: AtomicU16 = AtomicU16::new(0);
567static THREAD_COUNTER: AtomicU32 = AtomicU32::new(0);
568
569static mut SAMPLING_RATE: Option<f64> = None;
570
571thread_local! {
572 static THREAD_ID: ThreadId = THREAD_COUNTER.fetch_add(1, Ordering::Relaxed);
573 static CURRENT_SPAN_ID: Cell<Option<SpanId>> = Cell::new(None);
574}
575
576pub fn get_thread_id() -> ThreadId {
577 THREAD_ID.with(|id| *id)
578}
579
580pub fn get_current_span_id() -> Option<SpanId> {
581 CURRENT_SPAN_ID.with(|id| id.get())
582}
583
584pub fn set_current_span_id(new_id: Option<SpanId>) {
585 CURRENT_SPAN_ID.with(|id| {
586 id.set(new_id);
587 })
588}
589
590pub fn create_unique_id64() -> u64 {
599 let now = Utc::now();
600 let baseline = Utc.timestamp(0, 0);
601
602 let millis_since_epoch: u64 =
603 (now.signed_duration_since(baseline).num_milliseconds() << 16) as u64;
604 millis_since_epoch + UNIQUEID_COUNTER.fetch_add(1, Ordering::Relaxed) as u64
605}
606
607pub struct HashMapVisitor {
608 fields: HashMap<String, String>,
609}
610
611impl HashMapVisitor {
612 fn new() -> Self {
613 HashMapVisitor {
615 fields: HashMap::new(),
616 }
617 }
618 fn add_value(&mut self, field: &tracing::field::Field, value: String) {
619 self.fields.insert(field.name().to_string(), value);
620 }
621}
622
623impl tracing::field::Visit for HashMapVisitor {
624 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
625 self.add_value(field, format!("{}", value));
626 }
627 fn record_bool(&mut self, field: &tracing::field::Field, value: bool) {
628 self.add_value(field, format!("{}", value));
629 }
630 fn record_u64(&mut self, field: &tracing::field::Field, value: u64) {
631 self.add_value(field, format!("{}", value));
632 }
633 fn record_i64(&mut self, field: &tracing::field::Field, value: i64) {
634 self.add_value(field, format!("{}", value));
635 }
636 fn record_debug(&mut self, _field: &tracing::field::Field, _value: &dyn std::fmt::Debug) {
637 }
639}
640
641impl tracing::Subscriber for DatadogTracing {
642 fn enabled(&self, metadata: &tracing::Metadata<'_>) -> bool {
643 match self.log_config {
644 Some(ref lc) => log_level_to_trace_level(lc.level) >= *metadata.level(),
645 None => false,
646 }
647 }
648
649 fn new_span(&self, span: &tracing::span::Attributes<'_>) -> tracing::span::Id {
650 let nanos = Utc::now().timestamp_nanos() as u64;
651 let mut new_span_visitor = HashMapVisitor::new();
652 span.record(&mut new_span_visitor);
653 let trace_id = new_span_visitor
654 .fields
655 .remove("trace_id")
656 .and_then(|s| s.parse::<u64>().ok())
657 .unwrap_or(Utc::now().timestamp_nanos() as u64);
658 let span_id = Utc::now().timestamp_nanos() as u64 + 1;
659 let new_span = NewSpanData {
660 id: span_id,
661 trace_id,
662 start: Utc::now(),
663 resource: span.metadata().target().to_string(),
664 name: span.metadata().name().to_string(),
665 };
666 self.send_new_span(nanos, new_span).unwrap_or(());
667 tracing::span::Id::from_u64(span_id)
668 }
669
670 fn record(&self, _span: &tracing::span::Id, _values: &tracing::span::Record<'_>) {}
671
672 fn record_follows_from(&self, _span: &tracing::span::Id, _follows: &tracing::span::Id) {}
673
674 fn event(&self, event: &tracing::Event<'_>) {
675 let nanos = Utc::now().timestamp_nanos() as u64;
676 let thread_id = get_thread_id();
677 let mut new_evt_visitor = HashMapVisitor::new();
678 event.record(&mut new_evt_visitor);
679
680 self.send_event(nanos, thread_id, new_evt_visitor.fields, Utc::now())
681 .unwrap_or(());
682 }
683
684 fn enter(&self, span: &tracing::span::Id) {
685 let nanos = Utc::now().timestamp_nanos() as u64;
686 let thread_id = get_thread_id();
687 self.send_enter_span(nanos, thread_id, span.clone().into_u64())
688 .unwrap_or(());
689 set_current_span_id(Some(span.into_u64()));
690 }
691
692 fn exit(&self, span: &tracing::span::Id) {
693 let nanos = Utc::now().timestamp_nanos() as u64;
694 self.send_exit_span(nanos, span.clone().into_u64())
695 .unwrap_or(());
696 set_current_span_id(None);
697 }
698
699 fn try_close(&self, span: tracing::span::Id) -> bool {
700 let nanos = Utc::now().timestamp_nanos() as u64;
701 self.send_close_span(nanos, span.into_u64()).unwrap_or(());
702 false
703 }
704}
705
706#[cfg(feature = "json")]
707struct KeyValueMap(HashMap<String, String>);
708
709#[cfg(feature = "json")]
710impl<'kvs> kv::VisitSource<'kvs> for KeyValueMap {
711 fn visit_pair(&mut self, key: kv::Key<'kvs>, value: kv::Value<'kvs>) -> Result<(), kv::Error> {
712 self.0.insert(key.to_string(), value.to_string());
713 Ok(())
714 }
715}
716
717#[cfg(feature = "json")]
718fn build_key_value_map<'a>(record: &Record<'a>) -> HashMap<String, String> {
719 let mut visitor = KeyValueMap(HashMap::new());
720 let visit_result = record.key_values().visit(&mut visitor);
721 if let Err(e) = visit_result {
722 println!("Error building key value map: {:?}", e);
723 }
724
725 visitor.0
726}
727
728impl Log for DatadogTracing {
729 fn enabled(&self, metadata: &log::Metadata) -> bool {
730 if let Some(ref lc) = self.log_config {
731 metadata.level() <= lc.level
732 } else {
733 false
734 }
735 }
736
737 fn log(&self, record: &Record) {
738 if let Some(ref lc) = self.log_config {
739 #[cfg(feature = "json")]
740 let key_values = build_key_value_map(record);
741 if record.level() <= lc.level {
742 let thread_id = get_thread_id();
743 let now = chrono::Utc::now();
744 let msg_str = format!("{}", record.args());
745 let log_rec = LogRecord {
746 thread_id,
747 level: record.level(),
748 time: now,
749 module: record.module_path().map(|s| s.to_string()),
750 msg_str,
751 #[cfg(feature = "json")]
752 key_values,
753 };
754 self.send_log(log_rec).unwrap_or_else(|_| ());
755 }
756 }
757 }
758
759 fn flush(&self) {}
760}
761
762#[derive(Debug, Clone)]
763struct DdAgentClient {
764 client_sender: crossbeam_channel::Sender<Vec<Span>>,
765}
766
767impl DdAgentClient {
768 fn new(config: &Config) -> Self {
769 let (client_sender, client_requests) = crossbeam_channel::unbounded();
770
771 for _ in 0..config.num_client_send_threads {
772 let env = config.env.clone();
773 let service = config.service.clone();
774 let host = config.host.clone();
775 let port = config.port.clone();
776 let apm_config = config.apm_config.clone();
777 let cr_channel = client_requests.clone();
778 std::thread::spawn(move || {
779 DdAgentClient::thread_loop(
780 cr_channel,
781 env,
782 format!("http://{}:{}/v0.3/traces", host, port),
783 service,
784 apm_config,
785 )
786 });
787 }
788 DdAgentClient { client_sender }
789 }
790
791 fn send(&self, stack: Vec<Span>) {
792 self.client_sender.send(stack).unwrap_or_else(|_| {
793 println!("Tracing send error: Channel closed!");
794 });
795 }
796
797 fn thread_loop(
798 client_requests: crossbeam_channel::Receiver<Vec<Span>>,
799 env: Option<String>,
800 endpoint: String,
801 service: String,
802 apm_config: ApmConfig,
803 ) {
804 while let Ok(stack) = client_requests.recv() {
806 let count = stack.len();
807 let spans: Vec<Vec<RawSpan>> = vec![stack
808 .into_iter()
809 .map(|s| RawSpan::from_span(&s, &service, &env, &apm_config))
810 .collect()];
811 match to_string(&spans) {
812 Err(e) => println!("Couldn't encode payload for datadog: {:?}", e),
813 Ok(payload) => {
814 let req = attohttpc::post(&endpoint)
815 .header("Content-Length", payload.len() as u64)
816 .header("Content-Type", "application/json")
817 .header("X-Datadog-Trace-Count", count)
818 .text(&payload);
819
820 match req.send() {
821 Ok(resp) if !resp.is_success() => {
822 println!("error from datadog agent: {:?}", resp)
823 }
824 Err(err) => println!("error sending traces to datadog: {:?}", err),
825 _ => {}
826 }
827 }
828 }
829 }
830 }
831}
832
833#[cfg(test)]
834mod tests {
835 use super::*;
836 use log::{debug, info, Level};
837 use tracing::{event, span};
838
839 fn long_call(trace_id: u64) {
840 let span = span!(tracing::Level::INFO, "long_call", trace_id = trace_id);
841 let _e = span.enter();
842 debug!("Waiting on I/O {}", trace_id);
843 sleep_call(trace_id);
844 info!("I/O Finished {}", trace_id);
845 }
846
847 fn sleep_call(trace_id: u64) {
848 let span = span!(tracing::Level::INFO, "sleep_call", trace_id = trace_id);
849 let _e = span.enter();
850 debug!("Long call {}", trace_id);
851 debug!(
852 "Current thread ID/span ID: {}/{:?}",
853 get_thread_id(),
854 get_current_span_id()
855 );
856 std::thread::sleep(std::time::Duration::from_millis(2000));
857 }
858
859 fn traced_func_no_send(trace_id: u64) {
860 let span = span!(
861 tracing::Level::INFO,
862 "traced_func_no_send",
863 trace_id = trace_id
864 );
865 let _e = span.enter();
866 debug!(
867 "Performing some function for id={}/{:?}",
868 trace_id,
869 get_current_span_id()
870 );
871 long_call(trace_id);
872 }
873
874 fn traced_http_func(trace_id: u64) {
875 let span = span!(
876 tracing::Level::INFO,
877 "traced_http_func",
878 trace_id = trace_id
879 );
880 let _e = span.enter();
881 debug!(
882 "Performing some function for id={}/{:?}",
883 trace_id,
884 get_current_span_id()
885 );
886 long_call(trace_id);
887 event!(
888 tracing::Level::INFO,
889 http.url = "http://test.test/",
890 http.status_code = "200",
891 http.method = "GET"
892 );
893 event!(tracing::Level::INFO, send_trace = trace_id);
894 }
895
896 fn traced_error_func(trace_id: u64) {
897 let span = span!(
898 tracing::Level::INFO,
899 "traced_error_func",
900 trace_id = trace_id
901 );
902 let _e = span.enter();
903 debug!(
904 "Performing some function for id={}/{:?}",
905 trace_id,
906 get_current_span_id()
907 );
908 long_call(trace_id);
909 event!(
910 tracing::Level::ERROR,
911 error.etype = "",
912 error.message = "Test error"
913 );
914 event!(
915 tracing::Level::ERROR,
916 http.url = "http://test.test/",
917 http.status_code = "400",
918 http.method = "GET"
919 );
920 event!(
921 tracing::Level::ERROR,
922 custom_tag = "good",
923 custom_tag2 = "test",
924 send_trace = trace_id
925 );
926 }
927
928 fn traced_error_func_single_event(trace_id: u64) {
929 let span = span!(
930 tracing::Level::INFO,
931 "traced_error_func_single_event",
932 trace_id = trace_id
933 );
934 let _e = span.enter();
935
936 debug!(
937 "Performing some function for id={}/{:?}",
938 trace_id,
939 get_current_span_id()
940 );
941 long_call(trace_id);
942 event!(
943 tracing::Level::ERROR,
944 send_trace = trace_id,
945 error.etype = "",
946 error.message = "Test error",
947 http.url = "http://test.test/",
948 http.status_code = "400",
949 http.method = "GET",
950 custom_tag = "good",
951 custom_tag2 = "test"
952 );
953 }
954
955 fn trace_config() {
956 let config = Config {
957 service: String::from("datadog_apm_test"),
958 env: Some("staging-01".into()),
959 logging_config: Some(LoggingConfig {
960 level: Level::Trace,
961 mod_filter: vec!["hyper", "mime"],
962 ..LoggingConfig::default()
963 }),
964 enable_tracing: true,
965 ..Default::default()
966 };
967 let _client = DatadogTracing::new(config);
968 }
969
970 #[test]
971 fn test_exit_child_span() {
972 trace_config();
973 let trace_id = 1u64;
974
975 let f1 = std::thread::spawn(move || {
976 let span = span!(tracing::Level::INFO, "parent_span", trace_id = trace_id);
977 let _e = span.enter();
978 info!("Inside parent_span, should print trace and span ID");
979 {
980 let span = span!(tracing::Level::INFO, "child_span", trace_id = trace_id);
981 let _e = span.enter();
982 info!("Inside child_span, should print trace and span ID");
983 }
984 info!("Back in parent_span, should print trace and span ID");
985 });
986 f1.join().unwrap();
987 event!(tracing::Level::INFO, send_trace = trace_id);
988 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
989 }
990
991 #[test]
992 fn test_trace_one_func_stack() {
993 let trace_id = create_unique_id64();
994 trace_config();
995
996 debug!(
997 "Outside of span, this should be None: {:?}",
998 get_current_span_id()
999 );
1000 debug!(
1001 "Sampling rate is {}",
1002 DatadogTracing::get_global_sampling_rate()
1003 );
1004
1005 let f1 = std::thread::spawn(move || {
1006 traced_func_no_send(trace_id);
1007 event!(tracing::Level::INFO, send_trace = trace_id);
1008 });
1009
1010 debug!(
1011 "Same as before span, after span completes, this should be None: {:?}",
1012 get_current_span_id()
1013 );
1014 f1.join().unwrap();
1015 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1016 }
1017
1018 #[test]
1019 fn test_parallel_two_threads_two_traces() {
1020 let trace_id1 = create_unique_id64();
1021 let trace_id2 = create_unique_id64();
1022 trace_config();
1023 let f1 = std::thread::spawn(move || {
1024 traced_func_no_send(trace_id1);
1025 event!(tracing::Level::INFO, send_trace = trace_id1);
1026 });
1027 let f2 = std::thread::spawn(move || {
1028 traced_func_no_send(trace_id2);
1029 event!(tracing::Level::INFO, send_trace = trace_id2);
1030 });
1031
1032 f1.join().unwrap();
1033 f2.join().unwrap();
1034 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1035 }
1036
1037 #[test]
1038 fn test_parallel_two_threads_ten_traces() {
1039 let trace_id1 = create_unique_id64();
1040 let trace_id2 = create_unique_id64() + 1;
1041 let trace_id3 = create_unique_id64() + 2;
1042 let trace_id4 = create_unique_id64() + 3;
1043 let trace_id5 = create_unique_id64() + 4;
1044 let trace_id6 = create_unique_id64() + 5;
1045 let trace_id7 = create_unique_id64() + 6;
1046 let trace_id8 = create_unique_id64() + 7;
1047 let trace_id9 = create_unique_id64() + 8;
1048 let trace_id10 = create_unique_id64() + 9;
1049 trace_config();
1050 let f1 = std::thread::spawn(move || {
1051 traced_func_no_send(trace_id1);
1052 event!(tracing::Level::INFO, send_trace = trace_id1);
1053 });
1054 let f2 = std::thread::spawn(move || {
1055 traced_func_no_send(trace_id2);
1056 event!(tracing::Level::INFO, send_trace = trace_id2);
1057 });
1058 let f3 = std::thread::spawn(move || {
1059 traced_func_no_send(trace_id3);
1060 event!(tracing::Level::INFO, send_trace = trace_id3);
1061 });
1062 let f4 = std::thread::spawn(move || {
1063 traced_func_no_send(trace_id4);
1064 event!(tracing::Level::INFO, send_trace = trace_id4);
1065 });
1066 let f5 = std::thread::spawn(move || {
1067 traced_func_no_send(trace_id5);
1068 event!(tracing::Level::INFO, send_trace = trace_id5);
1069 });
1070 let f6 = std::thread::spawn(move || {
1071 traced_func_no_send(trace_id6);
1072 event!(tracing::Level::INFO, send_trace = trace_id6);
1073 });
1074 let f7 = std::thread::spawn(move || {
1075 traced_func_no_send(trace_id7);
1076 event!(tracing::Level::INFO, send_trace = trace_id7);
1077 });
1078 let f8 = std::thread::spawn(move || {
1079 traced_func_no_send(trace_id8);
1080 event!(tracing::Level::INFO, send_trace = trace_id8);
1081 });
1082 let f9 = std::thread::spawn(move || {
1083 traced_func_no_send(trace_id9);
1084 event!(tracing::Level::INFO, send_trace = trace_id9);
1085 });
1086 let f10 = std::thread::spawn(move || {
1087 traced_func_no_send(trace_id10);
1088 event!(tracing::Level::INFO, send_trace = trace_id10);
1089 });
1090 f1.join().unwrap();
1091 f2.join().unwrap();
1092 f3.join().unwrap();
1093 f4.join().unwrap();
1094 f5.join().unwrap();
1095 f6.join().unwrap();
1096 f7.join().unwrap();
1097 f8.join().unwrap();
1098 f9.join().unwrap();
1099 f10.join().unwrap();
1100
1101 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1102 }
1103
1104 #[test]
1105 fn test_error_span() {
1106 let trace_id = create_unique_id64();
1107 trace_config();
1108 let f3 = std::thread::spawn(move || {
1109 traced_error_func(trace_id);
1110 });
1111 f3.join().unwrap();
1112 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1113 }
1114
1115 #[test]
1116 fn test_error_span_as_single_event() {
1117 let trace_id = create_unique_id64();
1118 trace_config();
1119 let f4 = std::thread::spawn(move || {
1120 traced_error_func_single_event(trace_id);
1121 });
1122 f4.join().unwrap();
1123 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1124 }
1125
1126 #[test]
1127 fn test_two_funcs_in_one_span() {
1128 let trace_id = create_unique_id64();
1129 trace_config();
1130 let f5 = std::thread::spawn(move || {
1131 traced_func_no_send(trace_id);
1132 traced_func_no_send(trace_id);
1133 event!(tracing::Level::INFO, send_trace = trace_id);
1135 });
1136 f5.join().unwrap();
1137 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1138 }
1139
1140 #[test]
1141 fn test_one_thread_two_funcs_serial_two_traces() {
1142 let trace_id1 = create_unique_id64();
1143 let trace_id2 = create_unique_id64();
1144 trace_config();
1145 let f7 = std::thread::spawn(move || {
1146 traced_func_no_send(trace_id1);
1147 event!(tracing::Level::INFO, send_trace = trace_id1);
1148
1149 traced_func_no_send(trace_id2);
1150 event!(tracing::Level::INFO, send_trace = trace_id2);
1151 });
1152 f7.join().unwrap();
1153 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1154 }
1155
1156 #[test]
1157 fn test_http_span() {
1158 let trace_id = create_unique_id64();
1159 trace_config();
1160 let f3 = std::thread::spawn(move || {
1161 traced_http_func(trace_id);
1162 });
1163 f3.join().unwrap();
1164 ::std::thread::sleep(::std::time::Duration::from_millis(1000));
1165 }
1166}