opentelemetry_datadog/exporter/model/
mod.rs

1use crate::exporter::ModelConfig;
2use http::uri;
3use opentelemetry_sdk::{
4    trace::{self, SpanData},
5    ExportError, Resource,
6};
7use std::fmt::Debug;
8use url::ParseError;
9
10use self::unified_tags::UnifiedTags;
11
12use super::Mapping;
13
14pub mod unified_tags;
15mod v03;
16mod v05;
17
18// todo: we should follow the same mapping defined in https://github.com/DataDog/datadog-agent/blob/main/pkg/trace/api/otlp.go
19
20// https://github.com/DataDog/dd-trace-js/blob/c89a35f7d27beb4a60165409376e170eacb194c5/packages/dd-trace/src/constants.js#L4
21static SAMPLING_PRIORITY_KEY: &str = "_sampling_priority_v1";
22
23// https://github.com/DataDog/datadog-agent/blob/ec96f3c24173ec66ba235bda7710504400d9a000/pkg/trace/traceutil/span.go#L20
24static DD_MEASURED_KEY: &str = "_dd.measured";
25
26/// Custom mapping between opentelemetry spans and datadog spans.
27///
28/// User can provide custom function to change the mapping. It currently supports customizing the following
29/// fields in Datadog span protocol.
30///
31/// |field name|default value|
32/// |---------------|-------------|
33/// |service name| service name configuration from [`ModelConfig`]|
34/// |name | opentelemetry instrumentation library name |
35/// |resource| opentelemetry name|
36///
37/// The function takes a reference to [`SpanData`]() and a reference to [`ModelConfig`]() as parameters.
38/// It should return a `&str` which will be used as the value for the field.
39///
40/// If no custom mapping is provided. Default mapping detailed above will be used.
41///
42/// For example,
43/// ```no_run
44/// use opentelemetry::global;
45/// use opentelemetry_datadog::{ApiVersion, new_pipeline};
46///
47/// fn main() -> Result<(), opentelemetry_sdk::trace::TraceError> {
48///     let provider = new_pipeline()
49///         .with_service_name("my_app")
50///         .with_api_version(ApiVersion::Version05)
51///         // the custom mapping below will change the all spans' name to datadog spans
52///         .with_name_mapping(|span, model_config|{"datadog spans"})
53///         .with_agent_endpoint("http://localhost:8126")
54///         .install_batch()?;
55///     global::set_tracer_provider(provider.clone());
56///     let tracer = global::tracer("opentelemetry-datadog-demo");
57///
58///     Ok(())
59/// }
60/// ```
61pub type FieldMappingFn = dyn for<'a> Fn(&'a SpanData, &'a ModelConfig) -> &'a str + Send + Sync;
62
63pub(crate) type FieldMapping = std::sync::Arc<FieldMappingFn>;
64
65// Datadog uses some magic tags in their models. There is no recommended mapping defined in
66// opentelemetry spec. Below is default mapping we gonna uses. Users can override it by providing
67// their own implementations.
68fn default_service_name_mapping<'a>(_span: &'a SpanData, config: &'a ModelConfig) -> &'a str {
69    config.service_name.as_str()
70}
71
72fn default_name_mapping<'a>(span: &'a SpanData, _config: &'a ModelConfig) -> &'a str {
73    span.instrumentation_scope.name()
74}
75
76fn default_resource_mapping<'a>(span: &'a SpanData, _config: &'a ModelConfig) -> &'a str {
77    span.name.as_ref()
78}
79
80/// Wrap type for errors from opentelemetry datadog exporter
81#[derive(Debug, thiserror::Error)]
82pub enum Error {
83    /// Message pack error
84    #[error("message pack error")]
85    MessagePackError,
86    /// No http client founded. User should provide one or enable features
87    #[error("http client must be set, users can enable reqwest or surf feature to use http client implementation within create")]
88    NoHttpClient,
89    /// Http requests failed with following errors
90    #[error(transparent)]
91    RequestError(#[from] http::Error),
92    /// The Uri was invalid
93    #[error("invalid url {0}")]
94    InvalidUri(String),
95    /// Other errors
96    #[error("{0}")]
97    Other(String),
98}
99
100impl ExportError for Error {
101    fn exporter_name(&self) -> &'static str {
102        "datadog"
103    }
104}
105
106impl From<rmp::encode::ValueWriteError> for Error {
107    fn from(_: rmp::encode::ValueWriteError) -> Self {
108        Self::MessagePackError
109    }
110}
111
112impl From<url::ParseError> for Error {
113    fn from(err: ParseError) -> Self {
114        Self::InvalidUri(err.to_string())
115    }
116}
117
118impl From<uri::InvalidUri> for Error {
119    fn from(err: uri::InvalidUri) -> Self {
120        Self::InvalidUri(err.to_string())
121    }
122}
123
124/// Version of datadog trace ingestion API
125#[derive(Debug, Copy, Clone)]
126#[non_exhaustive]
127pub enum ApiVersion {
128    /// Version 0.3
129    Version03,
130    /// Version 0.5 - requires datadog-agent v7.22.0 or above
131    Version05,
132}
133
134impl ApiVersion {
135    pub(crate) fn path(self) -> &'static str {
136        match self {
137            ApiVersion::Version03 => "/v0.3/traces",
138            ApiVersion::Version05 => "/v0.5/traces",
139        }
140    }
141
142    pub(crate) fn content_type(self) -> &'static str {
143        match self {
144            ApiVersion::Version03 => "application/msgpack",
145            ApiVersion::Version05 => "application/msgpack",
146        }
147    }
148
149    pub(crate) fn encode(
150        self,
151        model_config: &ModelConfig,
152        traces: Vec<&[trace::SpanData]>,
153        mapping: &Mapping,
154        unified_tags: &UnifiedTags,
155        resource: Option<&Resource>,
156    ) -> Result<Vec<u8>, Error> {
157        match self {
158            Self::Version03 => v03::encode(
159                model_config,
160                traces,
161                |span, config| match &mapping.service_name {
162                    Some(f) => f(span, config),
163                    None => default_service_name_mapping(span, config),
164                },
165                |span, config| match &mapping.name {
166                    Some(f) => f(span, config),
167                    None => default_name_mapping(span, config),
168                },
169                |span, config| match &mapping.resource {
170                    Some(f) => f(span, config),
171                    None => default_resource_mapping(span, config),
172                },
173                resource,
174            ),
175            Self::Version05 => v05::encode(
176                model_config,
177                traces,
178                |span, config| match &mapping.service_name {
179                    Some(f) => f(span, config),
180                    None => default_service_name_mapping(span, config),
181                },
182                |span, config| match &mapping.name {
183                    Some(f) => f(span, config),
184                    None => default_name_mapping(span, config),
185                },
186                |span, config| match &mapping.resource {
187                    Some(f) => f(span, config),
188                    None => default_resource_mapping(span, config),
189                },
190                unified_tags,
191                resource,
192            ),
193        }
194    }
195}
196
197#[cfg(test)]
198pub(crate) mod tests {
199    use super::*;
200    use base64::{engine::general_purpose::STANDARD, Engine};
201    use opentelemetry::InstrumentationScope;
202    use opentelemetry::{
203        trace::{SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState},
204        KeyValue,
205    };
206    use opentelemetry_sdk::{
207        self,
208        trace::{SpanEvents, SpanLinks},
209    };
210    use std::time::{Duration, SystemTime};
211
212    fn get_traces() -> Vec<Vec<trace::SpanData>> {
213        vec![vec![get_span(7, 1, 99)]]
214    }
215
216    pub(crate) fn get_span(trace_id: u128, parent_span_id: u64, span_id: u64) -> trace::SpanData {
217        let span_context = SpanContext::new(
218            TraceId::from_u128(trace_id),
219            SpanId::from_u64(span_id),
220            TraceFlags::default(),
221            false,
222            TraceState::default(),
223        );
224
225        let start_time = SystemTime::UNIX_EPOCH;
226        let end_time = start_time.checked_add(Duration::from_secs(1)).unwrap();
227
228        let attributes = vec![KeyValue::new("span.type", "web")];
229        let events = SpanEvents::default();
230        let links = SpanLinks::default();
231        let instrumentation_scope = InstrumentationScope::builder("component").build();
232
233        trace::SpanData {
234            span_context,
235            parent_span_id: SpanId::from_u64(parent_span_id),
236            span_kind: SpanKind::Client,
237            name: "resource".into(),
238            start_time,
239            end_time,
240            attributes,
241            dropped_attributes_count: 0,
242            events,
243            links,
244            status: Status::Ok,
245            instrumentation_scope,
246        }
247    }
248
249    #[test]
250    fn test_encode_v03() -> Result<(), Box<dyn std::error::Error>> {
251        let traces = get_traces();
252        let model_config = ModelConfig {
253            service_name: "service_name".to_string(),
254            ..Default::default()
255        };
256        let resource = Resource::builder_empty()
257            .with_attribute(KeyValue::new("host.name", "test"))
258            .build();
259        let encoded = STANDARD.encode(ApiVersion::Version03.encode(
260            &model_config,
261            traces.iter().map(|x| &x[..]).collect(),
262            &Mapping::empty(),
263            &UnifiedTags::new(),
264            Some(&resource),
265        )?);
266
267        assert_eq!(encoded.as_str(), "kZGMpHR5cGWjd2Vip3NlcnZpY2Wsc2VydmljZV9uYW1lpG5hbWWpY29tcG9uZW\
268        50qHJlc291cmNlqHJlc291cmNlqHRyYWNlX2lkzwAAAAAAAAAHp3NwYW5faWTPAAAAAAAAAGOpcGFyZW50X2lkzwAAAA\
269        AAAAABpXN0YXJ00wAAAAAAAAAAqGR1cmF0aW9u0wAAAAA7msoApWVycm9y0gAAAACkbWV0YYKpaG9zdC5uYW1lpHRlc3\
270        Spc3Bhbi50eXBlo3dlYqdtZXRyaWNzgbVfc2FtcGxpbmdfcHJpb3JpdHlfdjHLAAAAAAAAAAA=");
271
272        Ok(())
273    }
274
275    #[test]
276    fn test_encode_v05() -> Result<(), Box<dyn std::error::Error>> {
277        let traces = get_traces();
278        let model_config = ModelConfig {
279            service_name: "service_name".to_string(),
280            ..Default::default()
281        };
282        let resource = Resource::builder()
283            .with_attribute(KeyValue::new("host.name", "test"))
284            .build();
285
286        let mut unified_tags = UnifiedTags::new();
287        unified_tags.set_env(Some(String::from("test-env")));
288        unified_tags.set_version(Some(String::from("test-version")));
289        unified_tags.set_service(Some(String::from("test-service")));
290
291        let _encoded = STANDARD.encode(ApiVersion::Version05.encode(
292            &model_config,
293            traces.iter().map(|x| &x[..]).collect(),
294            &Mapping::empty(),
295            &unified_tags,
296            Some(&resource),
297        )?);
298
299        // TODO: Need someone to generate the expected result or instructions to do so.
300        // assert_eq!(encoded.as_str(), "kp6jd2VirHNlcnZpY2VfbmFtZaljb21wb25lbnSocmVzb3VyY2WpaG9zdC5uYW\
301        // 1lpHRlc3Snc2VydmljZax0ZXN0LXNlcnZpY2WjZW52qHRlc3QtZW52p3ZlcnNpb26sdGVzdC12ZXJzaW9uqXNwYW4udH\
302        // lwZbVfc2FtcGxpbmdfcHJpb3JpdHlfdjGRkZzOAAAAAc4AAAACzgAAAAPPAAAAAAAAAAfPAAAAAAAAAGPPAAAAAAAAAA\
303        // HTAAAAAAAAAADTAAAAADuaygDSAAAAAIXOAAAABM4AAAAFzgAAAAbOAAAAB84AAAAIzgAAAAnOAAAACs4AAAALzgAAAA\
304        // zOAAAAAIHOAAAADcsAAAAAAAAAAM4AAAAA");
305
306        Ok(())
307    }
308}