opentelemetry_datadog_cloudflare/exporter/
mod.rs

1mod model;
2
3use async_trait::async_trait;
4use futures_util::lock::Mutex;
5use http::{Method, Request, Uri};
6use itertools::Itertools;
7pub use model::Error;
8use opentelemetry::sdk::export::trace;
9use opentelemetry::sdk::export::trace::{SpanData, SpanExporter};
10use opentelemetry::sdk::resource::ResourceDetector;
11use opentelemetry::sdk::resource::SdkProvidedResourceDetector;
12use opentelemetry::sdk::trace::Config;
13use opentelemetry::sdk::trace::Span;
14use opentelemetry::sdk::trace::SpanProcessor;
15use opentelemetry::sdk::Resource;
16use opentelemetry::trace::{SpanId, TraceResult};
17use opentelemetry::trace::{StatusCode, TraceError};
18use opentelemetry::{global, Key};
19use opentelemetry::{sdk, trace::TracerProvider, KeyValue};
20use opentelemetry_http::HttpClient;
21use opentelemetry_semantic_conventions as semcov;
22use prost::Message;
23use std::any::Any;
24use std::collections::BTreeMap;
25use std::convert::TryInto;
26use std::sync::{Arc, RwLock};
27use std::time::{Duration, SystemTime};
28
29use crate::dd_proto;
30
31#[cfg(not(feature = "reqwest-client"))]
32use reqwest as _;
33
34const DEFAULT_SITE_ENDPOINT: &str = "https://trace.agent.datadoghq.eu/";
35const DEFAULT_DD_TRACES_PATH: &str = "api/v0.2/traces";
36const DEFAULT_DD_CONTENT_TYPE: &str = "application/x-protobuf";
37const DEFAULT_DD_API_KEY_HEADER: &str = "DD-Api-Key";
38const DEFAULT_FLUSH_SIZE: usize = 500;
39
40const VERSION: &str = env!("CARGO_PKG_VERSION");
41
42/// Datadog span exporter
43#[derive(Debug, Clone)]
44#[allow(clippy::module_name_repetitions)]
45pub struct DatadogExporter {
46    client: Arc<dyn HttpClient>,
47    request_url: Uri,
48    service_name: String,
49    env: String,
50    tags: BTreeMap<String, String>,
51    host_name: String,
52    key: String,
53    runtime_id: String,
54    container_id: String,
55    app_version: String,
56    flush_size: usize,
57}
58
59impl DatadogExporter {
60    #[allow(clippy::too_many_arguments)]
61    fn new(
62        service_name: String,
63        request_url: Uri,
64        client: Arc<dyn HttpClient>,
65        key: String,
66        env: String,
67        tags: BTreeMap<String, String>,
68        host_name: String,
69        runtime_id: String,
70        container_id: String,
71        app_version: String,
72        flush_size: usize,
73    ) -> Self {
74        DatadogExporter {
75            client,
76            request_url,
77            service_name,
78            env,
79            tags,
80            host_name,
81            key,
82            runtime_id,
83            container_id,
84            app_version,
85            flush_size,
86        }
87    }
88}
89
90/// Create a new Datadog exporter pipeline builder.
91#[must_use]
92pub fn new_pipeline() -> DatadogPipelineBuilder {
93    DatadogPipelineBuilder::default()
94}
95
96/// Builder for `ExporterConfig` struct.
97#[derive(Debug)]
98pub struct DatadogPipelineBuilder {
99    service_name: Option<String>,
100    agent_endpoint: String,
101    api_key: Option<String>,
102    trace_config: Option<sdk::trace::Config>,
103    client: Option<Arc<dyn HttpClient>>,
104    env: Option<String>,
105    tags: Option<BTreeMap<String, String>>,
106    host_name: Option<String>,
107    runtime_id: Option<String>,
108    container_id: Option<String>,
109    app_version: Option<String>,
110    flush_size: Option<usize>,
111}
112
113impl Default for DatadogPipelineBuilder {
114    fn default() -> Self {
115        DatadogPipelineBuilder {
116            service_name: None,
117            agent_endpoint: DEFAULT_SITE_ENDPOINT.to_string(),
118            trace_config: None,
119            api_key: None,
120            #[cfg(not(feature = "reqwest-client"))]
121            client: None,
122            #[cfg(feature = "reqwest-client")]
123            client: Some(Arc::new(reqwest::Client::new())),
124            env: None,
125            tags: None,
126            host_name: None,
127            runtime_id: None,
128            container_id: None,
129            app_version: None,
130            flush_size: None,
131        }
132    }
133}
134
135/// A [`SpanProcessor`] that exports asynchronously when asked to do it.
136#[derive(Debug)]
137#[allow(clippy::type_complexity)]
138pub struct WASMWorkerSpanProcessor {
139    spans: RwLock<Vec<SpanData>>,
140    exporter: Arc<Mutex<Box<dyn SpanExporter>>>,
141    flush_size: usize,
142}
143
144impl WASMWorkerSpanProcessor {
145    pub(crate) fn new(exporter: Box<dyn SpanExporter>, flush_size: usize) -> Self {
146        WASMWorkerSpanProcessor {
147            spans: RwLock::new(Vec::with_capacity(flush_size)),
148            exporter: Arc::new(Mutex::new(exporter)),
149            flush_size,
150        }
151    }
152}
153
154#[async_trait]
155pub trait SpanProcessExt {
156    async fn force_flush(&self) -> TraceResult<()>;
157}
158
159#[async_trait]
160impl SpanProcessExt for WASMWorkerSpanProcessor {
161    async fn force_flush(&self) -> TraceResult<()> {
162        let to_export = {
163            let mut lock = match self.spans.write() {
164                Ok(l) => l,
165                Err(e) => {
166                    global::handle_error(e);
167                    return Err(TraceError::from("unable to obtain lock to flush"));
168                }
169            };
170
171            let export_size = if lock.len() > self.flush_size {
172                self.flush_size
173            } else {
174                lock.len()
175            };
176
177            lock.drain(0..export_size).collect::<Vec<_>>()
178        };
179
180        let mut exporter = self.exporter.lock().await;
181
182        exporter.export(to_export).await
183    }
184}
185
186impl SpanProcessor for WASMWorkerSpanProcessor {
187    fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {
188        // Ignored
189    }
190
191    fn on_end(&self, span: SpanData) {
192        let mut lock = match self.spans.write() {
193            Ok(l) => l,
194            Err(e) => {
195                global::handle_error(e);
196                return;
197            }
198        };
199
200        lock.push(span);
201    }
202
203    fn force_flush(&self) -> TraceResult<()> {
204        Err(TraceError::from(
205            "Sync flush is not supported, use `force_flush` from `SpanProcessExt`",
206        ))
207    }
208
209    fn shutdown(&mut self) -> TraceResult<()> {
210        // We ignore the Shutdown as we are in a Worker process, either it'll be shutdown by the
211        // worker termination or it'll keep existing.
212        //
213        // TODO: Better handle it later.
214        Ok(())
215    }
216
217    fn as_any(&self) -> &dyn Any {
218        self
219    }
220}
221
222impl DatadogPipelineBuilder {
223    /// Building a new exporter.
224    ///
225    /// This is useful if you are manually constructing a pipeline.
226    ///
227    /// # Errors
228    ///
229    /// If the Endpoint or the `APIKey` are not properly set.
230    pub fn build_exporter(mut self) -> Result<DatadogExporter, TraceError> {
231        let (_, service_name) = self.build_config_and_service_name();
232        self.build_exporter_with_service_name(service_name)
233    }
234
235    fn build_config_and_service_name(&mut self) -> (Config, String) {
236        let service_name = self.service_name.take();
237        if let Some(service_name) = service_name {
238            let config = if let Some(mut cfg) = self.trace_config.take() {
239                cfg.resource = cfg.resource.map(|r| {
240                    let without_service_name = r
241                        .iter()
242                        .filter(|(k, _v)| {
243                            **k != Key::new(semcov::resource::SERVICE_NAME.to_string())
244                        })
245                        .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
246                        .collect::<Vec<KeyValue>>();
247                    Arc::new(Resource::new(without_service_name))
248                });
249                cfg
250            } else {
251                Config {
252                    resource: Some(Arc::new(Resource::empty())),
253                    ..Default::default()
254                }
255            };
256            (config, service_name)
257        } else {
258            let service_name = SdkProvidedResourceDetector
259                .detect(Duration::from_secs(0))
260                .get(Key::new(semcov::resource::SERVICE_NAME.to_string()))
261                .unwrap()
262                .to_string();
263            (
264                Config {
265                    // use a empty resource to prevent TracerProvider to assign a service name.
266                    resource: Some(Arc::new(Resource::empty())),
267                    ..Default::default()
268                },
269                service_name,
270            )
271        }
272    }
273
274    fn build_exporter_with_service_name(
275        self,
276        service_name: String,
277    ) -> Result<DatadogExporter, TraceError> {
278        if let Some(client) = self.client {
279            let endpoint = self.agent_endpoint + DEFAULT_DD_TRACES_PATH;
280            let exporter = DatadogExporter::new(
281                service_name,
282                endpoint.parse().map_err::<Error, _>(Into::into)?,
283                client,
284                self.api_key
285                    .ok_or_else(|| TraceError::Other("APIKey not provied".into()))?,
286                self.env.unwrap_or_default(),
287                self.tags.unwrap_or_default(),
288                self.host_name.unwrap_or_default(),
289                self.runtime_id.unwrap_or_default(),
290                self.container_id.unwrap_or_default(),
291                self.app_version.unwrap_or_default(),
292                self.flush_size.unwrap_or(DEFAULT_FLUSH_SIZE),
293            );
294            Ok(exporter)
295        } else {
296            Err(Error::NoHttpClient.into())
297        }
298    }
299
300    /// Install the Datadog worker trace exporter pipeline using a simple span processor.
301    ///
302    /// # Errors
303    ///
304    /// If the Endpoint or the `APIKey` are not properly set.
305    /// Install the Datadog worker trace exporter pipeline using a simple span processor.
306    ///
307    /// # Errors
308    ///
309    /// If the Endpoint or the `APIKey` are not properly set.
310    pub fn install(
311        mut self,
312    ) -> Result<(sdk::trace::Tracer, sdk::trace::TracerProvider), TraceError> {
313        let (config, service_name) = self.build_config_and_service_name();
314        let exporter = self.build_exporter_with_service_name(service_name)?;
315        let flush_size = exporter.flush_size;
316        let span_processor = WASMWorkerSpanProcessor::new(Box::new(exporter), flush_size);
317        let mut provider_builder =
318            sdk::trace::TracerProvider::builder().with_span_processor(span_processor);
319        provider_builder = provider_builder.with_config(config);
320        let provider = provider_builder.build();
321        let tracer = provider.versioned_tracer(
322            "opentelemetry-datadog-cloudflare",
323            Some(env!("CARGO_PKG_VERSION")),
324            None,
325        );
326
327        Ok((tracer, provider))
328    }
329
330    /// Assign the service name under which to group traces
331    #[must_use]
332    pub fn with_service_name<T: Into<String>>(mut self, name: T) -> Self {
333        self.service_name = Some(name.into());
334        self
335    }
336
337    /// Assign the Datadog trace endpoint
338    #[must_use]
339    pub fn with_endpoint<T: Into<String>>(mut self, endpoint: T) -> Self {
340        self.agent_endpoint = endpoint.into();
341        self
342    }
343
344    #[must_use]
345    pub fn with_api_key<T: Into<String>>(mut self, key: Option<T>) -> Self {
346        self.api_key = key.map(Into::into);
347        self
348    }
349
350    /// Choose the http client used by uploader
351    #[must_use]
352    pub fn with_http_client<T: HttpClient + 'static>(
353        mut self,
354        client: Arc<dyn HttpClient>,
355    ) -> Self {
356        self.client = Some(client);
357        self
358    }
359
360    /// Assign the SDK trace configuration
361    #[must_use]
362    pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self {
363        self.trace_config = Some(config);
364        self
365    }
366
367    /// Assign the env
368    #[must_use]
369    pub fn with_env(mut self, env: String) -> Self {
370        self.env = Some(env);
371        self
372    }
373
374    /// Assign the `host_name`
375    #[must_use]
376    pub fn with_host_name(mut self, host_name: String) -> Self {
377        self.host_name = Some(host_name);
378        self
379    }
380
381    /// Assign the `runtime_id`
382    #[must_use]
383    pub fn with_runtime_id(mut self, runtime_id: String) -> Self {
384        self.runtime_id = Some(runtime_id);
385        self
386    }
387
388    /// Assign the `container_id`
389    #[must_use]
390    pub fn with_container_id(mut self, container_id: String) -> Self {
391        self.container_id = Some(container_id);
392        self
393    }
394
395    /// Assign the `app_version`
396    #[must_use]
397    pub fn with_app_version(mut self, app_version: String) -> Self {
398        self.app_version = Some(app_version);
399        self
400    }
401
402    /// Assign the tags
403    #[must_use]
404    pub fn with_tags(mut self, tags: BTreeMap<String, String>) -> Self {
405        self.tags = Some(tags);
406        self
407    }
408
409    /// Assign the tags
410    #[must_use]
411    pub fn with_flush_size(mut self, flush_size: usize) -> Self {
412        self.flush_size = Some(flush_size);
413        self
414    }
415}
416
417fn group_into_traces(spans: Vec<SpanData>) -> Vec<Vec<SpanData>> {
418    spans
419        .into_iter()
420        .into_group_map_by(|span_data| span_data.span_context.trace_id()).into_values()
421        .collect()
422}
423
424/// Helper function whish should be rewritte, as we only need u64 for `TraceID`
425pub(crate) fn u128_to_u64s(n: u128) -> [u64; 2] {
426    let bytes = n.to_ne_bytes();
427    let (mut high, mut low) = bytes.split_at(8);
428
429    if cfg!(target_endian = "little") {
430        std::mem::swap(&mut high, &mut low);
431    }
432
433    [
434        u64::from_ne_bytes(high.try_into().unwrap()),
435        u64::from_ne_bytes(low.try_into().unwrap()),
436    ]
437}
438
439fn trace_into_dd_tracer_payload(exporter: &DatadogExporter, trace: SpanData) -> dd_proto::Span {
440    let trace_id = trace.span_context.trace_id();
441    let span_id: SpanId = trace.span_context.span_id();
442    let span_id = u64::from_be_bytes(span_id.to_bytes());
443    let parent_id = trace.parent_span_id;
444    let parent_id = u64::from_be_bytes(parent_id.to_bytes());
445
446    let resource = trace
447        .attributes
448        .get(&Key::from_static_str("code.namespace"))
449        .map(std::string::ToString::to_string)
450        .unwrap_or_default();
451    let [t0, _t1] = u128_to_u64s(u128::from_be_bytes(trace_id.to_bytes()));
452
453    #[allow(clippy::cast_possible_truncation)]
454    let start = trace
455        .start_time
456        .duration_since(SystemTime::UNIX_EPOCH)
457        .unwrap()
458        .as_nanos() as i64;
459    #[allow(clippy::cast_possible_truncation)]
460    let duration = trace
461        .end_time
462        .duration_since(trace.start_time)
463        .unwrap()
464        .as_nanos() as i64;
465
466    let meta = trace
467        .attributes
468        .into_iter()
469        .map(|(k, v)| (k.to_string(), v.to_string()))
470        .collect::<BTreeMap<String, String>>();
471
472    dd_proto::Span {
473        service: exporter.service_name.clone(),
474        name: trace.name.to_string(),
475        resource,
476        r#type: "http".to_string(),
477        trace_id: t0,
478        span_id,
479        parent_id,
480        error: match trace.status_code {
481            StatusCode::Unset | StatusCode::Ok => 0,
482            StatusCode::Error => 1,
483        },
484        start,
485        duration,
486        meta,
487        metrics: BTreeMap::new(),
488        meta_struct: BTreeMap::new(),
489    }
490}
491
492fn trace_into_chunk(spans: Vec<dd_proto::Span>) -> dd_proto::TraceChunk {
493    dd_proto::TraceChunk {
494        // This should not happen for Datadog originated traces, but in case this field is not populated
495        // we default to 1 (https://github.com/DataDog/datadog-agent/blob/eac2327/pkg/trace/sampler/sampler.go#L54-L55),
496        // which is what the Datadog trace-agent is doing for OTLP originated traces, as per
497        // https://github.com/DataDog/datadog-agent/blob/3ea2eb4/pkg/trace/api/otlp.go#L309.
498        priority: 100i32,
499        origin: "lambda".to_string(),
500        spans,
501        tags: BTreeMap::new(),
502        dropped_trace: false,
503    }
504}
505
506impl DatadogExporter {
507    fn trace_into_tracer(&self, chunks: Vec<dd_proto::TraceChunk>) -> dd_proto::TracerPayload {
508        dd_proto::TracerPayload {
509            container_id: self.container_id.clone(),
510            language_name: "rust".to_string(),
511            language_version: String::new(),
512            tracer_version: VERSION.to_string(),
513            runtime_id: self.runtime_id.clone(),
514            chunks,
515            app_version: self.app_version.clone(),
516        }
517    }
518
519    fn trace_build(&self, tracer: Vec<dd_proto::TracerPayload>) -> dd_proto::TracePayload {
520        dd_proto::TracePayload {
521            host_name: self.host_name.clone(),
522            env: self.env.clone(),
523            traces: vec![],
524            transactions: vec![],
525            tracer_payloads: tracer,
526            tags: self.tags.clone(),
527            agent_version: VERSION.to_string(),
528            target_tps: 1000f64,
529            error_tps: 1000f64,
530        }
531    }
532}
533
534#[async_trait::async_trait]
535impl trace::SpanExporter for DatadogExporter {
536    /// Export spans to datadog
537    // TODO: Should split & batch them when it's too big, check Vector reference.
538    async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
539        let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
540
541        let chunks: Vec<dd_proto::TraceChunk> = traces
542            .into_iter()
543            .map(|spans| {
544                trace_into_chunk(
545                    spans
546                        .into_iter()
547                        .map(|trace| trace_into_dd_tracer_payload(self, trace))
548                        .collect(),
549                )
550            })
551            .collect();
552
553        let traces = self.trace_into_tracer(chunks);
554
555        let trace = self.trace_build(vec![traces]);
556        let trace = trace.encode_to_vec();
557
558        let req = Request::builder()
559            .method(Method::POST)
560            .uri(self.request_url.clone())
561            .header(http::header::CONTENT_TYPE, DEFAULT_DD_CONTENT_TYPE)
562            .header("X-Datadog-Reported-Languages", "rust")
563            .header(DEFAULT_DD_API_KEY_HEADER, self.key.clone())
564            .body(trace)
565            .map_err::<Error, _>(Into::into)?;
566
567        if let Err(e) = self.client.send(req).await {
568            return Err(TraceError::from(e.to_string()));
569        }
570
571        Ok(())
572    }
573}