1mod model;
2
3use async_trait::async_trait;
4use futures_util::lock::Mutex;
5use http::{Method, Request, Uri};
6use itertools::Itertools;
7pub use model::Error;
8use opentelemetry::sdk::export::trace;
9use opentelemetry::sdk::export::trace::{SpanData, SpanExporter};
10use opentelemetry::sdk::resource::ResourceDetector;
11use opentelemetry::sdk::resource::SdkProvidedResourceDetector;
12use opentelemetry::sdk::trace::Config;
13use opentelemetry::sdk::trace::Span;
14use opentelemetry::sdk::trace::SpanProcessor;
15use opentelemetry::sdk::Resource;
16use opentelemetry::trace::{SpanId, TraceResult};
17use opentelemetry::trace::{StatusCode, TraceError};
18use opentelemetry::{global, Key};
19use opentelemetry::{sdk, trace::TracerProvider, KeyValue};
20use opentelemetry_http::HttpClient;
21use opentelemetry_semantic_conventions as semcov;
22use prost::Message;
23use std::any::Any;
24use std::collections::BTreeMap;
25use std::convert::TryInto;
26use std::sync::{Arc, RwLock};
27use std::time::{Duration, SystemTime};
28
29use crate::dd_proto;
30
31#[cfg(not(feature = "reqwest-client"))]
32use reqwest as _;
33
34const DEFAULT_SITE_ENDPOINT: &str = "https://trace.agent.datadoghq.eu/";
35const DEFAULT_DD_TRACES_PATH: &str = "api/v0.2/traces";
36const DEFAULT_DD_CONTENT_TYPE: &str = "application/x-protobuf";
37const DEFAULT_DD_API_KEY_HEADER: &str = "DD-Api-Key";
38const DEFAULT_FLUSH_SIZE: usize = 500;
39
40const VERSION: &str = env!("CARGO_PKG_VERSION");
41
42#[derive(Debug, Clone)]
44#[allow(clippy::module_name_repetitions)]
45pub struct DatadogExporter {
46 client: Arc<dyn HttpClient>,
47 request_url: Uri,
48 service_name: String,
49 env: String,
50 tags: BTreeMap<String, String>,
51 host_name: String,
52 key: String,
53 runtime_id: String,
54 container_id: String,
55 app_version: String,
56 flush_size: usize,
57}
58
59impl DatadogExporter {
60 #[allow(clippy::too_many_arguments)]
61 fn new(
62 service_name: String,
63 request_url: Uri,
64 client: Arc<dyn HttpClient>,
65 key: String,
66 env: String,
67 tags: BTreeMap<String, String>,
68 host_name: String,
69 runtime_id: String,
70 container_id: String,
71 app_version: String,
72 flush_size: usize,
73 ) -> Self {
74 DatadogExporter {
75 client,
76 request_url,
77 service_name,
78 env,
79 tags,
80 host_name,
81 key,
82 runtime_id,
83 container_id,
84 app_version,
85 flush_size,
86 }
87 }
88}
89
90#[must_use]
92pub fn new_pipeline() -> DatadogPipelineBuilder {
93 DatadogPipelineBuilder::default()
94}
95
96#[derive(Debug)]
98pub struct DatadogPipelineBuilder {
99 service_name: Option<String>,
100 agent_endpoint: String,
101 api_key: Option<String>,
102 trace_config: Option<sdk::trace::Config>,
103 client: Option<Arc<dyn HttpClient>>,
104 env: Option<String>,
105 tags: Option<BTreeMap<String, String>>,
106 host_name: Option<String>,
107 runtime_id: Option<String>,
108 container_id: Option<String>,
109 app_version: Option<String>,
110 flush_size: Option<usize>,
111}
112
113impl Default for DatadogPipelineBuilder {
114 fn default() -> Self {
115 DatadogPipelineBuilder {
116 service_name: None,
117 agent_endpoint: DEFAULT_SITE_ENDPOINT.to_string(),
118 trace_config: None,
119 api_key: None,
120 #[cfg(not(feature = "reqwest-client"))]
121 client: None,
122 #[cfg(feature = "reqwest-client")]
123 client: Some(Arc::new(reqwest::Client::new())),
124 env: None,
125 tags: None,
126 host_name: None,
127 runtime_id: None,
128 container_id: None,
129 app_version: None,
130 flush_size: None,
131 }
132 }
133}
134
135#[derive(Debug)]
137#[allow(clippy::type_complexity)]
138pub struct WASMWorkerSpanProcessor {
139 spans: RwLock<Vec<SpanData>>,
140 exporter: Arc<Mutex<Box<dyn SpanExporter>>>,
141 flush_size: usize,
142}
143
144impl WASMWorkerSpanProcessor {
145 pub(crate) fn new(exporter: Box<dyn SpanExporter>, flush_size: usize) -> Self {
146 WASMWorkerSpanProcessor {
147 spans: RwLock::new(Vec::with_capacity(flush_size)),
148 exporter: Arc::new(Mutex::new(exporter)),
149 flush_size,
150 }
151 }
152}
153
154#[async_trait]
155pub trait SpanProcessExt {
156 async fn force_flush(&self) -> TraceResult<()>;
157}
158
159#[async_trait]
160impl SpanProcessExt for WASMWorkerSpanProcessor {
161 async fn force_flush(&self) -> TraceResult<()> {
162 let to_export = {
163 let mut lock = match self.spans.write() {
164 Ok(l) => l,
165 Err(e) => {
166 global::handle_error(e);
167 return Err(TraceError::from("unable to obtain lock to flush"));
168 }
169 };
170
171 let export_size = if lock.len() > self.flush_size {
172 self.flush_size
173 } else {
174 lock.len()
175 };
176
177 lock.drain(0..export_size).collect::<Vec<_>>()
178 };
179
180 let mut exporter = self.exporter.lock().await;
181
182 exporter.export(to_export).await
183 }
184}
185
186impl SpanProcessor for WASMWorkerSpanProcessor {
187 fn on_start(&self, _span: &mut Span, _cx: &opentelemetry::Context) {
188 }
190
191 fn on_end(&self, span: SpanData) {
192 let mut lock = match self.spans.write() {
193 Ok(l) => l,
194 Err(e) => {
195 global::handle_error(e);
196 return;
197 }
198 };
199
200 lock.push(span);
201 }
202
203 fn force_flush(&self) -> TraceResult<()> {
204 Err(TraceError::from(
205 "Sync flush is not supported, use `force_flush` from `SpanProcessExt`",
206 ))
207 }
208
209 fn shutdown(&mut self) -> TraceResult<()> {
210 Ok(())
215 }
216
217 fn as_any(&self) -> &dyn Any {
218 self
219 }
220}
221
222impl DatadogPipelineBuilder {
223 pub fn build_exporter(mut self) -> Result<DatadogExporter, TraceError> {
231 let (_, service_name) = self.build_config_and_service_name();
232 self.build_exporter_with_service_name(service_name)
233 }
234
235 fn build_config_and_service_name(&mut self) -> (Config, String) {
236 let service_name = self.service_name.take();
237 if let Some(service_name) = service_name {
238 let config = if let Some(mut cfg) = self.trace_config.take() {
239 cfg.resource = cfg.resource.map(|r| {
240 let without_service_name = r
241 .iter()
242 .filter(|(k, _v)| {
243 **k != Key::new(semcov::resource::SERVICE_NAME.to_string())
244 })
245 .map(|(k, v)| KeyValue::new(k.clone(), v.clone()))
246 .collect::<Vec<KeyValue>>();
247 Arc::new(Resource::new(without_service_name))
248 });
249 cfg
250 } else {
251 Config {
252 resource: Some(Arc::new(Resource::empty())),
253 ..Default::default()
254 }
255 };
256 (config, service_name)
257 } else {
258 let service_name = SdkProvidedResourceDetector
259 .detect(Duration::from_secs(0))
260 .get(Key::new(semcov::resource::SERVICE_NAME.to_string()))
261 .unwrap()
262 .to_string();
263 (
264 Config {
265 resource: Some(Arc::new(Resource::empty())),
267 ..Default::default()
268 },
269 service_name,
270 )
271 }
272 }
273
274 fn build_exporter_with_service_name(
275 self,
276 service_name: String,
277 ) -> Result<DatadogExporter, TraceError> {
278 if let Some(client) = self.client {
279 let endpoint = self.agent_endpoint + DEFAULT_DD_TRACES_PATH;
280 let exporter = DatadogExporter::new(
281 service_name,
282 endpoint.parse().map_err::<Error, _>(Into::into)?,
283 client,
284 self.api_key
285 .ok_or_else(|| TraceError::Other("APIKey not provied".into()))?,
286 self.env.unwrap_or_default(),
287 self.tags.unwrap_or_default(),
288 self.host_name.unwrap_or_default(),
289 self.runtime_id.unwrap_or_default(),
290 self.container_id.unwrap_or_default(),
291 self.app_version.unwrap_or_default(),
292 self.flush_size.unwrap_or(DEFAULT_FLUSH_SIZE),
293 );
294 Ok(exporter)
295 } else {
296 Err(Error::NoHttpClient.into())
297 }
298 }
299
300 pub fn install(
311 mut self,
312 ) -> Result<(sdk::trace::Tracer, sdk::trace::TracerProvider), TraceError> {
313 let (config, service_name) = self.build_config_and_service_name();
314 let exporter = self.build_exporter_with_service_name(service_name)?;
315 let flush_size = exporter.flush_size;
316 let span_processor = WASMWorkerSpanProcessor::new(Box::new(exporter), flush_size);
317 let mut provider_builder =
318 sdk::trace::TracerProvider::builder().with_span_processor(span_processor);
319 provider_builder = provider_builder.with_config(config);
320 let provider = provider_builder.build();
321 let tracer = provider.versioned_tracer(
322 "opentelemetry-datadog-cloudflare",
323 Some(env!("CARGO_PKG_VERSION")),
324 None,
325 );
326
327 Ok((tracer, provider))
328 }
329
330 #[must_use]
332 pub fn with_service_name<T: Into<String>>(mut self, name: T) -> Self {
333 self.service_name = Some(name.into());
334 self
335 }
336
337 #[must_use]
339 pub fn with_endpoint<T: Into<String>>(mut self, endpoint: T) -> Self {
340 self.agent_endpoint = endpoint.into();
341 self
342 }
343
344 #[must_use]
345 pub fn with_api_key<T: Into<String>>(mut self, key: Option<T>) -> Self {
346 self.api_key = key.map(Into::into);
347 self
348 }
349
350 #[must_use]
352 pub fn with_http_client<T: HttpClient + 'static>(
353 mut self,
354 client: Arc<dyn HttpClient>,
355 ) -> Self {
356 self.client = Some(client);
357 self
358 }
359
360 #[must_use]
362 pub fn with_trace_config(mut self, config: sdk::trace::Config) -> Self {
363 self.trace_config = Some(config);
364 self
365 }
366
367 #[must_use]
369 pub fn with_env(mut self, env: String) -> Self {
370 self.env = Some(env);
371 self
372 }
373
374 #[must_use]
376 pub fn with_host_name(mut self, host_name: String) -> Self {
377 self.host_name = Some(host_name);
378 self
379 }
380
381 #[must_use]
383 pub fn with_runtime_id(mut self, runtime_id: String) -> Self {
384 self.runtime_id = Some(runtime_id);
385 self
386 }
387
388 #[must_use]
390 pub fn with_container_id(mut self, container_id: String) -> Self {
391 self.container_id = Some(container_id);
392 self
393 }
394
395 #[must_use]
397 pub fn with_app_version(mut self, app_version: String) -> Self {
398 self.app_version = Some(app_version);
399 self
400 }
401
402 #[must_use]
404 pub fn with_tags(mut self, tags: BTreeMap<String, String>) -> Self {
405 self.tags = Some(tags);
406 self
407 }
408
409 #[must_use]
411 pub fn with_flush_size(mut self, flush_size: usize) -> Self {
412 self.flush_size = Some(flush_size);
413 self
414 }
415}
416
417fn group_into_traces(spans: Vec<SpanData>) -> Vec<Vec<SpanData>> {
418 spans
419 .into_iter()
420 .into_group_map_by(|span_data| span_data.span_context.trace_id()).into_values()
421 .collect()
422}
423
424pub(crate) fn u128_to_u64s(n: u128) -> [u64; 2] {
426 let bytes = n.to_ne_bytes();
427 let (mut high, mut low) = bytes.split_at(8);
428
429 if cfg!(target_endian = "little") {
430 std::mem::swap(&mut high, &mut low);
431 }
432
433 [
434 u64::from_ne_bytes(high.try_into().unwrap()),
435 u64::from_ne_bytes(low.try_into().unwrap()),
436 ]
437}
438
439fn trace_into_dd_tracer_payload(exporter: &DatadogExporter, trace: SpanData) -> dd_proto::Span {
440 let trace_id = trace.span_context.trace_id();
441 let span_id: SpanId = trace.span_context.span_id();
442 let span_id = u64::from_be_bytes(span_id.to_bytes());
443 let parent_id = trace.parent_span_id;
444 let parent_id = u64::from_be_bytes(parent_id.to_bytes());
445
446 let resource = trace
447 .attributes
448 .get(&Key::from_static_str("code.namespace"))
449 .map(std::string::ToString::to_string)
450 .unwrap_or_default();
451 let [t0, _t1] = u128_to_u64s(u128::from_be_bytes(trace_id.to_bytes()));
452
453 #[allow(clippy::cast_possible_truncation)]
454 let start = trace
455 .start_time
456 .duration_since(SystemTime::UNIX_EPOCH)
457 .unwrap()
458 .as_nanos() as i64;
459 #[allow(clippy::cast_possible_truncation)]
460 let duration = trace
461 .end_time
462 .duration_since(trace.start_time)
463 .unwrap()
464 .as_nanos() as i64;
465
466 let meta = trace
467 .attributes
468 .into_iter()
469 .map(|(k, v)| (k.to_string(), v.to_string()))
470 .collect::<BTreeMap<String, String>>();
471
472 dd_proto::Span {
473 service: exporter.service_name.clone(),
474 name: trace.name.to_string(),
475 resource,
476 r#type: "http".to_string(),
477 trace_id: t0,
478 span_id,
479 parent_id,
480 error: match trace.status_code {
481 StatusCode::Unset | StatusCode::Ok => 0,
482 StatusCode::Error => 1,
483 },
484 start,
485 duration,
486 meta,
487 metrics: BTreeMap::new(),
488 meta_struct: BTreeMap::new(),
489 }
490}
491
492fn trace_into_chunk(spans: Vec<dd_proto::Span>) -> dd_proto::TraceChunk {
493 dd_proto::TraceChunk {
494 priority: 100i32,
499 origin: "lambda".to_string(),
500 spans,
501 tags: BTreeMap::new(),
502 dropped_trace: false,
503 }
504}
505
506impl DatadogExporter {
507 fn trace_into_tracer(&self, chunks: Vec<dd_proto::TraceChunk>) -> dd_proto::TracerPayload {
508 dd_proto::TracerPayload {
509 container_id: self.container_id.clone(),
510 language_name: "rust".to_string(),
511 language_version: String::new(),
512 tracer_version: VERSION.to_string(),
513 runtime_id: self.runtime_id.clone(),
514 chunks,
515 app_version: self.app_version.clone(),
516 }
517 }
518
519 fn trace_build(&self, tracer: Vec<dd_proto::TracerPayload>) -> dd_proto::TracePayload {
520 dd_proto::TracePayload {
521 host_name: self.host_name.clone(),
522 env: self.env.clone(),
523 traces: vec![],
524 transactions: vec![],
525 tracer_payloads: tracer,
526 tags: self.tags.clone(),
527 agent_version: VERSION.to_string(),
528 target_tps: 1000f64,
529 error_tps: 1000f64,
530 }
531 }
532}
533
534#[async_trait::async_trait]
535impl trace::SpanExporter for DatadogExporter {
536 async fn export(&mut self, batch: Vec<SpanData>) -> trace::ExportResult {
539 let traces: Vec<Vec<SpanData>> = group_into_traces(batch);
540
541 let chunks: Vec<dd_proto::TraceChunk> = traces
542 .into_iter()
543 .map(|spans| {
544 trace_into_chunk(
545 spans
546 .into_iter()
547 .map(|trace| trace_into_dd_tracer_payload(self, trace))
548 .collect(),
549 )
550 })
551 .collect();
552
553 let traces = self.trace_into_tracer(chunks);
554
555 let trace = self.trace_build(vec![traces]);
556 let trace = trace.encode_to_vec();
557
558 let req = Request::builder()
559 .method(Method::POST)
560 .uri(self.request_url.clone())
561 .header(http::header::CONTENT_TYPE, DEFAULT_DD_CONTENT_TYPE)
562 .header("X-Datadog-Reported-Languages", "rust")
563 .header(DEFAULT_DD_API_KEY_HEADER, self.key.clone())
564 .body(trace)
565 .map_err::<Error, _>(Into::into)?;
566
567 if let Err(e) = self.client.send(req).await {
568 return Err(TraceError::from(e.to_string()));
569 }
570
571 Ok(())
572 }
573}