opentelemetry_datadog/exporter/model/
mod.rs1use crate::exporter::ModelConfig;
2use http::uri;
3use opentelemetry_sdk::{
4 trace::{self, SpanData},
5 ExportError, Resource,
6};
7use std::fmt::Debug;
8use url::ParseError;
9
10use self::unified_tags::UnifiedTags;
11
12use super::Mapping;
13
14pub mod unified_tags;
15mod v03;
16mod v05;
17
18static SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1";
22
23static DD_MEASURED_KEY: &str = "_dd.measured";
25
26pub type FieldMappingFn = dyn for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync;
62
63pub(crate) type FieldMapping = std::sync::Arc<FieldMappingFn>;
64
65fn default_service_name_mapping<'a>(_span: &'a SpanData, config: &'a ModelConfig) -> &'a str {
69 config.service_name.as_str()
70}
71
72fn default_name_mapping<'a>(span: &'a SpanData, _config: &'a ModelConfig) -> &'a str {
73 span.instrumentation_scope.name()
74}
75
76fn default_resource_mapping<'a>(span: &'a SpanData, _config: &'a ModelConfig) -> &'a str {
77 span.name.as_ref()
78}
79
80#[derive(Debug, thiserror::Error)]
82pub enum Error {
83 #[error("message pack error")]
85 MessagePackError,
86 #[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")]
88 NoHttpClient,
89 #[error(transparent)]
91 RequestError(#[from] http::Error),
92 #[error("invalid url {0}")]
94 InvalidUri(String),
95 #[error("{0}")]
97 Other(String),
98}
99
100impl ExportError for Error {
101 fn exporter_name(&self) -> &'static str {
102 "datadog"
103 }
104}
105
106impl From<rmp::encode::ValueWriteError> for Error {
107 fn from(_: rmp::encode::ValueWriteError) -> Self {
108 Self::MessagePackError
109 }
110}
111
112impl From<url::ParseError> for Error {
113 fn from(err: ParseError) -> Self {
114 Self::InvalidUri(err.to_string())
115 }
116}
117
118impl From<uri::InvalidUri> for Error {
119 fn from(err: uri::InvalidUri) -> Self {
120 Self::InvalidUri(err.to_string())
121 }
122}
123
124#[derive(Debug, Copy, Clone)]
126#[non_exhaustive]
127pub enum ApiVersion {
128 Version03,
130 Version05,
132}
133
134impl ApiVersion {
135 pub(crate) fn path(self) -> &'static str {
136 match self {
137 ApiVersion::Version03 => "/v0.3/traces",
138 ApiVersion::Version05 => "/v0.5/traces",
139 }
140 }
141
142 pub(crate) fn content_type(self) -> &'static str {
143 match self {
144 ApiVersion::Version03 => "application/msgpack",
145 ApiVersion::Version05 => "application/msgpack",
146 }
147 }
148
149 pub(crate) fn encode(
150 self,
151 model_config: &ModelConfig,
152 traces: Vec<&[trace::SpanData]>,
153 mapping: &Mapping,
154 unified_tags: &UnifiedTags,
155 resource: Option<&Resource>,
156 ) -> Result<Vec<u8>, Error> {
157 match self {
158 Self::Version03 => v03::encode(
159 model_config,
160 traces,
161 |span, config| match &mapping.service_name {
162 Some(f) => f(span, config),
163 None => default_service_name_mapping(span, config),
164 },
165 |span, config| match &mapping.name {
166 Some(f) => f(span, config),
167 None => default_name_mapping(span, config),
168 },
169 |span, config| match &mapping.resource {
170 Some(f) => f(span, config),
171 None => default_resource_mapping(span, config),
172 },
173 resource,
174 ),
175 Self::Version05 => v05::encode(
176 model_config,
177 traces,
178 |span, config| match &mapping.service_name {
179 Some(f) => f(span, config),
180 None => default_service_name_mapping(span, config),
181 },
182 |span, config| match &mapping.name {
183 Some(f) => f(span, config),
184 None => default_name_mapping(span, config),
185 },
186 |span, config| match &mapping.resource {
187 Some(f) => f(span, config),
188 None => default_resource_mapping(span, config),
189 },
190 unified_tags,
191 resource,
192 ),
193 }
194 }
195}
196
197#[cfg(test)]
198pub(crate) mod tests {
199 use super::*;
200 use base64::{engine::general_purpose::STANDARD, Engine};
201 use opentelemetry::InstrumentationScope;
202 use opentelemetry::{
203 trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
204 KeyValue,
205 };
206 use opentelemetry_sdk::{
207 self,
208 trace::{SpanEvents, SpanLinks},
209 };
210 use std::time::{Duration, SystemTime};
211
212 fn get_traces() -> Vec<Vec<trace::SpanData>> {
213 vec![vec![get_span(7, 1, 99)]]
214 }
215
216 pub(crate) fn get_span(trace_id: u128, parent_span_id: u64, span_id: u64) -> trace::SpanData {
217 let span_context = SpanContext::new(
218 TraceId::from_u128(trace_id),
219 SpanId::from_u64(span_id),
220 TraceFlags::default(),
221 false,
222 TraceState::default(),
223 );
224
225 let start_time = SystemTime::UNIX_EPOCH;
226 let end_time = start_time.checked_add(Duration::from_secs(1)).unwrap();
227
228 let attributes = vec![KeyValue::new("span.type", "web")];
229 let events = SpanEvents::default();
230 let links = SpanLinks::default();
231 let instrumentation_scope = InstrumentationScope::builder("component").build();
232
233 trace::SpanData {
234 span_context,
235 parent_span_id: SpanId::from_u64(parent_span_id),
236 span_kind: SpanKind::Client,
237 name: "resource".into(),
238 start_time,
239 end_time,
240 attributes,
241 dropped_attributes_count: 0,
242 events,
243 links,
244 status: Status::Ok,
245 instrumentation_scope,
246 }
247 }
248
249 #[test]
250 fn test_encode_v03() -> Result<(), Box<dyn std::error::Error>> {
251 let traces = get_traces();
252 let model_config = ModelConfig {
253 service_name: "service_name".to_string(),
254 ..Default::default()
255 };
256 let resource = Resource::builder_empty()
257 .with_attribute(KeyValue::new("host.name", "test"))
258 .build();
259 let encoded = STANDARD.encode(ApiVersion::Version03.encode(
260 &model_config,
261 traces.iter().map(|x| &x[..]).collect(),
262 &Mapping::empty(),
263 &UnifiedTags::new(),
264 Some(&resource),
265 )?);
266
267 assert_eq!(encoded.as_str(), "kZGMpHR5cGWjd2Vip3NlcnZpY2Wsc2VydmljZV9uYW1lpG5hbWWpY29tcG9uZW\
268 50qHJlc291cmNlqHJlc291cmNlqHRyYWNlX2lkzwAAAAAAAAAHp3NwYW5faWTPAAAAAAAAAGOpcGFyZW50X2lkzwAAAA\
269 AAAAABpXN0YXJ00wAAAAAAAAAAqGR1cmF0aW9u0wAAAAA7msoApWVycm9y0gAAAACkbWV0YYKpaG9zdC5uYW1lpHRlc3\
270 Spc3Bhbi50eXBlo3dlYqdtZXRyaWNzgbVfc2FtcGxpbmdfcHJpb3JpdHlfdjHLAAAAAAAAAAA=");
271
272 Ok(())
273 }
274
275 #[test]
276 fn test_encode_v05() -> Result<(), Box<dyn std::error::Error>> {
277 let traces = get_traces();
278 let model_config = ModelConfig {
279 service_name: "service_name".to_string(),
280 ..Default::default()
281 };
282 let resource = Resource::builder()
283 .with_attribute(KeyValue::new("host.name", "test"))
284 .build();
285
286 let mut unified_tags = UnifiedTags::new();
287 unified_tags.set_env(Some(String::from("test-env")));
288 unified_tags.set_version(Some(String::from("test-version")));
289 unified_tags.set_service(Some(String::from("test-service")));
290
291 let _encoded = STANDARD.encode(ApiVersion::Version05.encode(
292 &model_config,
293 traces.iter().map(|x| &x[..]).collect(),
294 &Mapping::empty(),
295 &unified_tags,
296 Some(&resource),
297 )?);
298
299 Ok(())
307 }
308}