1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
use minitrace::prelude::*;
use rmp_serde::Serializer;
use serde::Serialize;
use std::collections::HashMap;
use std::error::Error;
use std::net::SocketAddr;
#[allow(clippy::too_many_arguments)]
pub fn encode(
service_name: &str,
trace_type: &str,
resource: &str,
error_code: i32,
trace_id: u64,
root_parent_span_id: u64,
span_id_prefix: u32,
spans: &[SpanRecord],
) -> Result<Vec<u8>, Box<dyn Error + Send + Sync + 'static>> {
let spans = spans.iter().map(|s| MPSpan {
name: s.event,
service: service_name,
trace_type,
resource,
start: s.begin_unix_time_ns as i64,
duration: s.duration_ns as i64,
meta: if s.properties.is_empty() {
None
} else {
Some(s.properties.iter().map(|(k, v)| (*k, v.as_ref())).collect())
},
error_code,
span_id: (span_id_prefix as u64) << 32 | s.id as u64,
trace_id,
parent_id: if s.parent_id == 0 {
root_parent_span_id
} else {
(span_id_prefix as u64) << 32 | s.parent_id as u64
},
});
let mut buf = vec![0b10010001];
spans
.collect::<Vec<_>>()
.serialize(&mut Serializer::new(&mut buf).with_struct_map())?;
Ok(buf)
}
pub fn report_blocking(
agent: SocketAddr,
bytes: Vec<u8>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let client = reqwest::blocking::Client::new();
let rep = client
.post(&format!("http://{}/v0.4/traces", agent))
.header("Datadog-Meta-Tracer-Version", "v1.27.0")
.header("Content-Type", "application/msgpack")
.body(bytes)
.send()?;
if rep.status().as_u16() >= 400 {
let status = rep.status();
return Err(format!("{} (Status: {})", rep.text()?, status).into());
}
Ok(())
}
pub async fn report(
agent: SocketAddr,
bytes: Vec<u8>,
) -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
let client = reqwest::Client::new();
let rep = client
.post(&format!("http://{}/v0.4/traces", agent))
.header("Datadog-Meta-Tracer-Version", "v1.27.0")
.header("Content-Type", "application/msgpack")
.body(bytes)
.send()
.await?;
if rep.status().as_u16() >= 400 {
let status = rep.status();
return Err(format!("{} (Status: {})", rep.text().await?, status).into());
}
Ok(())
}
#[derive(Serialize)]
struct MPSpan<'a> {
name: &'a str,
service: &'a str,
#[serde(rename = "type")]
trace_type: &'a str,
resource: &'a str,
start: i64,
duration: i64,
#[serde(skip_serializing_if = "Option::is_none")]
meta: Option<HashMap<&'a str, &'a str>>,
error_code: i32,
span_id: u64,
trace_id: u64,
parent_id: u64,
}