opentelemetry_stackdriver/
lib.rs

1#![cfg(not(doctest))]
2// unfortunately the proto code includes comments from the google proto files
3// that are interpreted as "doc tests" and will fail to build.
4// When this PR is merged we should be able to remove this attribute:
5// https://github.com/danburkert/prost/pull/291
6#![allow(
7    clippy::doc_lazy_continuation,
8    deprecated,
9    rustdoc::bare_urls,
10    rustdoc::broken_intra_doc_links,
11    rustdoc::invalid_rust_codeblocks
12)]
13
14use std::{
15    borrow::Cow,
16    collections::HashMap,
17    fmt,
18    future::Future,
19    sync::{
20        atomic::{AtomicUsize, Ordering},
21        Arc, RwLock,
22    },
23    time::{Duration, Instant},
24};
25
26use async_trait::async_trait;
27use futures_core::future::BoxFuture;
28use futures_util::stream::StreamExt;
29use opentelemetry::{
30    otel_error,
31    trace::{SpanId, TraceError},
32    Key, KeyValue, Value,
33};
34use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
35use opentelemetry_sdk::{
36    trace::{SpanData, SpanExporter},
37    Resource,
38};
39use opentelemetry_semantic_conventions as semconv;
40use thiserror::Error;
41#[cfg(feature = "gcp-authorizer")]
42use tonic::metadata::MetadataValue;
43use tonic::{
44    transport::{Channel, ClientTlsConfig},
45    Code, Request,
46};
47
48#[allow(clippy::derive_partial_eq_without_eq)] // tonic doesn't derive Eq for generated types
49pub mod proto;
50
51#[cfg(feature = "propagator")]
52pub mod google_trace_context_propagator;
53
54use proto::devtools::cloudtrace::v2::span::time_event::Annotation;
55use proto::devtools::cloudtrace::v2::span::{
56    Attributes, Link, Links, SpanKind, TimeEvent, TimeEvents,
57};
58use proto::devtools::cloudtrace::v2::trace_service_client::TraceServiceClient;
59use proto::devtools::cloudtrace::v2::{
60    AttributeValue, BatchWriteSpansRequest, Span, TruncatableString,
61};
62use proto::logging::v2::{
63    log_entry::Payload, logging_service_v2_client::LoggingServiceV2Client, LogEntry,
64    LogEntrySourceLocation, WriteLogEntriesRequest,
65};
66use proto::rpc::Status;
67
68/// Exports opentelemetry tracing spans to Google StackDriver.
69///
70/// As of the time of this writing, the opentelemetry crate exposes no link information
71/// so this struct does not send link information.
72#[derive(Clone)]
73pub struct StackDriverExporter {
74    tx: futures_channel::mpsc::Sender<Vec<SpanData>>,
75    pending_count: Arc<AtomicUsize>,
76    maximum_shutdown_duration: Duration,
77    resource: Arc<RwLock<Option<Resource>>>,
78}
79
80impl StackDriverExporter {
81    pub fn builder() -> Builder {
82        Builder::default()
83    }
84
85    pub fn pending_count(&self) -> usize {
86        self.pending_count.load(Ordering::Relaxed)
87    }
88}
89
90impl SpanExporter for StackDriverExporter {
91    fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
92        match self.tx.try_send(batch) {
93            Err(e) => Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
94                format!("{:?}", e),
95            )))),
96            Ok(()) => {
97                self.pending_count.fetch_add(1, Ordering::Relaxed);
98                Box::pin(std::future::ready(Ok(())))
99            }
100        }
101    }
102
103    fn shutdown(&mut self) -> OTelSdkResult {
104        let start = Instant::now();
105        while (Instant::now() - start) < self.maximum_shutdown_duration && self.pending_count() > 0
106        {
107            std::thread::yield_now();
108            // Spin for a bit and give the inner export some time to upload, with a timeout.
109        }
110        Ok(())
111    }
112
113    fn set_resource(&mut self, resource: &Resource) {
114        match self.resource.write() {
115            Ok(mut guard) => *guard = Some(resource.clone()),
116            Err(poisoned) => *poisoned.into_inner() = Some(resource.clone()),
117        }
118    }
119}
120
121impl fmt::Debug for StackDriverExporter {
122    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123        #[allow(clippy::unneeded_field_pattern)]
124        let Self {
125            tx: _,
126            pending_count,
127            maximum_shutdown_duration,
128            resource: _,
129        } = self;
130        f.debug_struct("StackDriverExporter")
131            .field("tx", &"(elided)")
132            .field("pending_count", pending_count)
133            .field("maximum_shutdown_duration", maximum_shutdown_duration)
134            .finish()
135    }
136}
137
138/// Helper type to build a `StackDriverExporter`.
139#[derive(Clone, Default)]
140pub struct Builder {
141    maximum_shutdown_duration: Option<Duration>,
142    num_concurrent_requests: Option<usize>,
143    log_context: Option<LogContext>,
144}
145
146impl Builder {
147    /// Set the maximum shutdown duration to export all the remaining data.
148    ///
149    /// If not set, defaults to 5 seconds.
150    pub fn maximum_shutdown_duration(mut self, duration: Duration) -> Self {
151        self.maximum_shutdown_duration = Some(duration);
152        self
153    }
154
155    /// Set the number of concurrent requests.
156    ///
157    /// If `num_concurrent_requests` is set to `0` or `None` then no limit is enforced.
158    pub fn num_concurrent_requests(mut self, num_concurrent_requests: usize) -> Self {
159        self.num_concurrent_requests = Some(num_concurrent_requests);
160        self
161    }
162
163    /// Enable writing log entries with the given `log_context`.
164    pub fn log_context(mut self, log_context: LogContext) -> Self {
165        self.log_context = Some(log_context);
166        self
167    }
168
169    pub async fn build<A: Authorizer>(
170        self,
171        authenticator: A,
172    ) -> Result<(StackDriverExporter, impl Future<Output = ()>), Error>
173    where
174        Error: From<A::Error>,
175    {
176        let Self {
177            maximum_shutdown_duration,
178            num_concurrent_requests,
179            log_context,
180        } = self;
181        let uri = http::uri::Uri::from_static("https://cloudtrace.googleapis.com:443");
182
183        #[cfg(all(feature = "tls-native-roots", not(feature = "tls-webpki-roots")))]
184        let tls_config = ClientTlsConfig::new().with_native_roots();
185        #[cfg(feature = "tls-webpki-roots")]
186        let tls_config = ClientTlsConfig::new().with_webpki_roots();
187        #[cfg(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")))]
188        let tls_config = ClientTlsConfig::new();
189
190        let trace_channel = Channel::builder(uri)
191            .tls_config(tls_config.clone())
192            .map_err(|e| Error::Transport(e.into()))?
193            .connect()
194            .await
195            .map_err(|e| Error::Transport(e.into()))?;
196
197        let log_client = match log_context {
198            Some(log_context) => {
199                let log_channel = Channel::builder(http::uri::Uri::from_static(
200                    "https://logging.googleapis.com:443",
201                ))
202                .tls_config(tls_config)
203                .map_err(|e| Error::Transport(e.into()))?
204                .connect()
205                .await
206                .map_err(|e| Error::Transport(e.into()))?;
207
208                Some(LogClient {
209                    client: LoggingServiceV2Client::new(log_channel),
210                    context: Arc::new(InternalLogContext::from(log_context)),
211                })
212            }
213            None => None,
214        };
215
216        let (tx, rx) = futures_channel::mpsc::channel(64);
217        let pending_count = Arc::new(AtomicUsize::new(0));
218        let scopes = Arc::new(match log_client {
219            Some(_) => vec![TRACE_APPEND, LOGGING_WRITE],
220            None => vec![TRACE_APPEND],
221        });
222
223        let count_clone = pending_count.clone();
224        let resource = Arc::new(RwLock::new(None));
225        let ctx_resource = resource.clone();
226        let future = async move {
227            let trace_client = TraceServiceClient::new(trace_channel);
228            let authorizer = &authenticator;
229            let log_client = log_client.clone();
230            rx.for_each_concurrent(num_concurrent_requests, move |batch| {
231                let trace_client = trace_client.clone();
232                let log_client = log_client.clone();
233                let pending_count = count_clone.clone();
234                let scopes = scopes.clone();
235                let resource = ctx_resource.clone();
236                ExporterContext {
237                    trace_client,
238                    log_client,
239                    authorizer,
240                    pending_count,
241                    scopes,
242                    resource,
243                }
244                .export(batch)
245            })
246            .await
247        };
248
249        let exporter = StackDriverExporter {
250            tx,
251            pending_count,
252            maximum_shutdown_duration: maximum_shutdown_duration
253                .unwrap_or_else(|| Duration::from_secs(5)),
254            resource,
255        };
256
257        Ok((exporter, future))
258    }
259}
260
261struct ExporterContext<'a, A> {
262    trace_client: TraceServiceClient<Channel>,
263    log_client: Option<LogClient>,
264    authorizer: &'a A,
265    pending_count: Arc<AtomicUsize>,
266    scopes: Arc<Vec<&'static str>>,
267    resource: Arc<RwLock<Option<Resource>>>,
268}
269
270impl<A: Authorizer> ExporterContext<'_, A>
271where
272    Error: From<A::Error>,
273{
274    async fn export(mut self, batch: Vec<SpanData>) {
275        use proto::devtools::cloudtrace::v2::span::time_event::Value;
276
277        let mut entries = Vec::new();
278        let mut spans = Vec::with_capacity(batch.len());
279        for span in batch {
280            let trace_id = hex::encode(span.span_context.trace_id().to_bytes());
281            let span_id = hex::encode(span.span_context.span_id().to_bytes());
282            let time_event = match &self.log_client {
283                None => span
284                    .events
285                    .into_iter()
286                    .map(|event| TimeEvent {
287                        time: Some(event.timestamp.into()),
288                        value: Some(Value::Annotation(Annotation {
289                            description: Some(to_truncate(event.name.into_owned())),
290                            ..Default::default()
291                        })),
292                    })
293                    .collect(),
294                Some(client) => {
295                    entries.extend(span.events.into_iter().map(|event| {
296                        let (mut level, mut target, mut labels) =
297                            (LogSeverity::Default, None, HashMap::default());
298                        for kv in event.attributes {
299                            match kv.key.as_str() {
300                                "level" => {
301                                    level = match kv.value.as_str().as_ref() {
302                                        "DEBUG" | "TRACE" => LogSeverity::Debug,
303                                        "INFO" => LogSeverity::Info,
304                                        "WARN" => LogSeverity::Warning,
305                                        "ERROR" => LogSeverity::Error,
306                                        _ => LogSeverity::Default, // tracing::Level is limited to the above 5
307                                    }
308                                }
309                                "target" => target = Some(kv.value.as_str().into_owned()),
310                                key => {
311                                    labels.insert(key.to_owned(), kv.value.as_str().into_owned());
312                                }
313                            }
314                        }
315                        let project_id = self.authorizer.project_id();
316                        let log_id = &client.context.log_id;
317                        LogEntry {
318                            log_name: format!("projects/{project_id}/logs/{log_id}"),
319                            resource: Some(client.context.resource.clone()),
320                            severity: level as i32,
321                            timestamp: Some(event.timestamp.into()),
322                            labels,
323                            trace: format!("projects/{project_id}/traces/{trace_id}"),
324                            span_id: span_id.clone(),
325                            source_location: target.map(|target| LogEntrySourceLocation {
326                                file: String::new(),
327                                line: 0,
328                                function: target,
329                            }),
330                            payload: Some(Payload::TextPayload(event.name.into_owned())),
331                            // severity, source_location, text_payload
332                            ..Default::default()
333                        }
334                    }));
335
336                    vec![]
337                }
338            };
339
340            let resource = self.resource.read().ok();
341            let attributes = match resource {
342                Some(resource) => Attributes::new(span.attributes, resource.as_ref()),
343                None => Attributes::new(span.attributes, None),
344            };
345
346            spans.push(Span {
347                name: format!(
348                    "projects/{}/traces/{}/spans/{}",
349                    self.authorizer.project_id(),
350                    hex::encode(span.span_context.trace_id().to_bytes()),
351                    hex::encode(span.span_context.span_id().to_bytes())
352                ),
353                display_name: Some(to_truncate(span.name.into_owned())),
354                span_id: hex::encode(span.span_context.span_id().to_bytes()),
355                // From the API docs: If this is a root span,
356                // then this field must be empty.
357                parent_span_id: match span.parent_span_id {
358                    SpanId::INVALID => "".to_owned(),
359                    _ => hex::encode(span.parent_span_id.to_bytes()),
360                },
361                start_time: Some(span.start_time.into()),
362                end_time: Some(span.end_time.into()),
363                attributes: Some(attributes),
364                time_events: Some(TimeEvents {
365                    time_event,
366                    ..Default::default()
367                }),
368                links: transform_links(&span.links),
369                status: status(span.status),
370                span_kind: SpanKind::from(span.span_kind) as i32,
371                ..Default::default()
372            });
373        }
374
375        let mut req = Request::new(BatchWriteSpansRequest {
376            name: format!("projects/{}", self.authorizer.project_id()),
377            spans,
378        });
379
380        self.pending_count.fetch_sub(1, Ordering::Relaxed);
381        if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
382            otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", TraceError::from(Error::Authorizer(e.into()))));
383        } else if let Err(e) = self.trace_client.batch_write_spans(req).await {
384            otel_error!(name: "ExportTransportError", error = format!("{:?}", TraceError::from(Error::Transport(e.into()))));
385        }
386
387        let client = match &mut self.log_client {
388            Some(client) => client,
389            None => return,
390        };
391
392        let mut req = Request::new(WriteLogEntriesRequest {
393            log_name: format!(
394                "projects/{}/logs/{}",
395                self.authorizer.project_id(),
396                client.context.log_id,
397            ),
398            entries,
399            dry_run: false,
400            labels: HashMap::default(),
401            partial_success: true,
402            resource: None,
403        });
404
405        if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
406            otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", TraceError::from(Error::Authorizer(e.into()))));
407        } else if let Err(e) = client.client.write_log_entries(req).await {
408            otel_error!(name: "ExportTransportError", error = format!("{:?}", TraceError::from(Error::Transport(e.into()))));
409        }
410    }
411}
412
413#[cfg(feature = "gcp-authorizer")]
414pub struct GcpAuthorizer {
415    provider: Arc<dyn gcp_auth::TokenProvider>,
416    project_id: Arc<str>,
417}
418
419#[cfg(feature = "gcp-authorizer")]
420impl GcpAuthorizer {
421    pub async fn new() -> Result<Self, Error> {
422        let provider = gcp_auth::provider()
423            .await
424            .map_err(|e| Error::Authorizer(e.into()))?;
425
426        let project_id = provider
427            .project_id()
428            .await
429            .map_err(|e| Error::Authorizer(e.into()))?;
430
431        Ok(Self {
432            provider,
433            project_id,
434        })
435    }
436    pub fn from_gcp_auth(provider: Arc<dyn gcp_auth::TokenProvider>, project_id: Arc<str>) -> Self {
437        Self {
438            provider,
439            project_id,
440        }
441    }
442}
443
444#[cfg(feature = "gcp-authorizer")]
445#[async_trait]
446impl Authorizer for GcpAuthorizer {
447    type Error = Error;
448
449    fn project_id(&self) -> &str {
450        &self.project_id
451    }
452
453    async fn authorize<T: Send + Sync>(
454        &self,
455        req: &mut Request<T>,
456        scopes: &[&str],
457    ) -> Result<(), Self::Error> {
458        let token = self
459            .provider
460            .token(scopes)
461            .await
462            .map_err(|e| Error::Authorizer(e.into()))?;
463
464        req.metadata_mut().insert(
465            "authorization",
466            MetadataValue::try_from(format!("Bearer {}", token.as_str())).unwrap(),
467        );
468
469        Ok(())
470    }
471}
472
473#[async_trait]
474pub trait Authorizer: Sync + Send + 'static {
475    type Error: std::error::Error + fmt::Debug + Send + Sync;
476
477    fn project_id(&self) -> &str;
478    async fn authorize<T: Send + Sync>(
479        &self,
480        request: &mut Request<T>,
481        scopes: &[&str],
482    ) -> Result<(), Self::Error>;
483}
484
485impl From<Value> for AttributeValue {
486    fn from(v: Value) -> AttributeValue {
487        use proto::devtools::cloudtrace::v2::attribute_value;
488        let new_value = match v {
489            Value::Bool(v) => attribute_value::Value::BoolValue(v),
490            Value::F64(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
491            Value::I64(v) => attribute_value::Value::IntValue(v),
492            Value::String(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
493            Value::Array(_) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
494            _ => attribute_value::Value::StringValue(to_truncate("".to_string())),
495        };
496        AttributeValue {
497            value: Some(new_value),
498        }
499    }
500}
501
502fn to_truncate(s: String) -> TruncatableString {
503    TruncatableString {
504        value: s,
505        ..Default::default()
506    }
507}
508
509#[derive(Debug, Error)]
510pub enum Error {
511    #[error("authorizer error: {0}")]
512    Authorizer(#[source] Box<dyn std::error::Error + Send + Sync>),
513    #[error("I/O error: {0}")]
514    Io(#[from] std::io::Error),
515    #[error("{0}")]
516    Other(#[from] Box<dyn std::error::Error + Send + Sync>),
517    #[error("tonic error: {0}")]
518    Transport(#[source] Box<dyn std::error::Error + Send + Sync>),
519}
520
521impl opentelemetry::trace::ExportError for Error {
522    fn exporter_name(&self) -> &'static str {
523        "stackdriver"
524    }
525}
526
527/// As defined in https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.type#google.logging.type.LogSeverity.
528enum LogSeverity {
529    Default = 0,
530    Debug = 100,
531    Info = 200,
532    Warning = 400,
533    Error = 500,
534}
535
536#[derive(Clone)]
537struct LogClient {
538    client: LoggingServiceV2Client<Channel>,
539    context: Arc<InternalLogContext>,
540}
541
542struct InternalLogContext {
543    log_id: String,
544    resource: proto::api::MonitoredResource,
545}
546
547#[derive(Clone)]
548pub struct LogContext {
549    pub log_id: String,
550    pub resource: MonitoredResource,
551}
552
553impl From<LogContext> for InternalLogContext {
554    fn from(cx: LogContext) -> Self {
555        let mut labels = HashMap::default();
556        let resource = match cx.resource {
557            MonitoredResource::CloudRunJob {
558                project_id,
559                job_name,
560                location,
561            } => {
562                labels.insert("project_id".to_string(), project_id);
563                if let Some(job_name) = job_name {
564                    labels.insert("job_name".to_string(), job_name);
565                }
566                if let Some(location) = location {
567                    labels.insert("location".to_string(), location);
568                }
569
570                proto::api::MonitoredResource {
571                    r#type: "cloud_run_job".to_owned(),
572                    labels,
573                }
574            }
575            MonitoredResource::CloudRunRevision {
576                project_id,
577                service_name,
578                revision_name,
579                location,
580                configuration_name,
581            } => {
582                labels.insert("project_id".to_string(), project_id);
583                if let Some(service_name) = service_name {
584                    labels.insert("service_name".to_string(), service_name);
585                }
586                if let Some(revision_name) = revision_name {
587                    labels.insert("revision_name".to_string(), revision_name);
588                }
589                if let Some(location) = location {
590                    labels.insert("location".to_string(), location);
591                }
592                if let Some(configuration_name) = configuration_name {
593                    labels.insert("configuration_name".to_string(), configuration_name);
594                }
595
596                proto::api::MonitoredResource {
597                    r#type: "cloud_run_revision".to_owned(),
598                    labels,
599                }
600            }
601            MonitoredResource::GenericNode {
602                project_id,
603                location,
604                namespace,
605                node_id,
606            } => {
607                labels.insert("project_id".to_string(), project_id);
608                if let Some(location) = location {
609                    labels.insert("location".to_string(), location);
610                }
611                if let Some(namespace) = namespace {
612                    labels.insert("namespace".to_string(), namespace);
613                }
614                if let Some(node_id) = node_id {
615                    labels.insert("node_id".to_string(), node_id);
616                }
617
618                proto::api::MonitoredResource {
619                    r#type: "generic_node".to_owned(),
620                    labels,
621                }
622            }
623            MonitoredResource::GenericTask {
624                project_id,
625                location,
626                namespace,
627                job,
628                task_id,
629            } => {
630                labels.insert("project_id".to_owned(), project_id);
631                if let Some(location) = location {
632                    labels.insert("location".to_owned(), location);
633                }
634                if let Some(namespace) = namespace {
635                    labels.insert("namespace".to_owned(), namespace);
636                }
637                if let Some(job) = job {
638                    labels.insert("job".to_owned(), job);
639                }
640                if let Some(task_id) = task_id {
641                    labels.insert("task_id".to_owned(), task_id);
642                }
643
644                proto::api::MonitoredResource {
645                    r#type: "generic_task".to_owned(),
646                    labels,
647                }
648            }
649            MonitoredResource::Global { project_id } => {
650                labels.insert("project_id".to_owned(), project_id);
651                proto::api::MonitoredResource {
652                    r#type: "global".to_owned(),
653                    labels,
654                }
655            }
656        };
657
658        Self {
659            log_id: cx.log_id,
660            resource,
661        }
662    }
663}
664
665/// A description of a `MonitoredResource`.
666///
667/// Possible values are listed in the [API documentation](https://cloud.google.com/logging/docs/api/v2/resource-list).
668/// Please submit an issue or pull request if you want to use a resource type not listed here.
669#[derive(Clone)]
670pub enum MonitoredResource {
671    Global {
672        project_id: String,
673    },
674    GenericNode {
675        project_id: String,
676        location: Option<String>,
677        namespace: Option<String>,
678        node_id: Option<String>,
679    },
680    GenericTask {
681        project_id: String,
682        location: Option<String>,
683        namespace: Option<String>,
684        job: Option<String>,
685        task_id: Option<String>,
686    },
687    CloudRunJob {
688        project_id: String,
689        job_name: Option<String>,
690        location: Option<String>,
691    },
692    CloudRunRevision {
693        project_id: String,
694        service_name: Option<String>,
695        revision_name: Option<String>,
696        location: Option<String>,
697        configuration_name: Option<String>,
698    },
699}
700
701impl Attributes {
702    /// Combines `EvictedHashMap` and `Resource` attributes into a maximum of 32.
703    ///
704    /// The `Resource` takes precedence over the `EvictedHashMap` attributes.
705    fn new(attributes: Vec<KeyValue>, resource: Option<&Resource>) -> Self {
706        let mut new = Self {
707            dropped_attributes_count: 0,
708            attribute_map: HashMap::with_capacity(Ord::min(
709                MAX_ATTRIBUTES_PER_SPAN,
710                attributes.len() + resource.map_or(0, |r| r.len()),
711            )),
712        };
713
714        if let Some(resource) = resource {
715            for (k, v) in resource.iter() {
716                new.push(Cow::Borrowed(k), Cow::Borrowed(v));
717            }
718        }
719
720        for kv in attributes {
721            new.push(Cow::Owned(kv.key), Cow::Owned(kv.value));
722        }
723
724        new
725    }
726
727    fn push(&mut self, key: Cow<'_, Key>, value: Cow<'_, Value>) {
728        if self.attribute_map.len() >= MAX_ATTRIBUTES_PER_SPAN {
729            self.dropped_attributes_count += 1;
730            return;
731        }
732
733        let key_str = key.as_str();
734        if key_str.len() > 128 {
735            self.dropped_attributes_count += 1;
736            return;
737        }
738
739        for (otel_key, gcp_key) in KEY_MAP {
740            if otel_key == key_str {
741                self.attribute_map
742                    .insert(gcp_key.to_owned(), value.into_owned().into());
743                return;
744            }
745        }
746
747        self.attribute_map.insert(
748            match key {
749                Cow::Owned(k) => k.to_string(),
750                Cow::Borrowed(k) => k.to_string(),
751            },
752            value.into_owned().into(),
753        );
754    }
755}
756
757fn transform_links(links: &opentelemetry_sdk::trace::SpanLinks) -> Option<Links> {
758    if links.is_empty() {
759        return None;
760    }
761
762    Some(Links {
763        dropped_links_count: links.dropped_count as i32,
764        link: links
765            .iter()
766            .map(|link| Link {
767                trace_id: hex::encode(link.span_context.trace_id().to_bytes()),
768                span_id: hex::encode(link.span_context.span_id().to_bytes()),
769                ..Default::default()
770            })
771            .collect(),
772    })
773}
774
775// Map conventional OpenTelemetry keys to their GCP counterparts.
776//
777// https://cloud.google.com/trace/docs/trace-labels
778const KEY_MAP: [(&str, &str); 19] = [
779    (HTTP_PATH, GCP_HTTP_PATH),
780    (semconv::attribute::HTTP_HOST, "/http/host"),
781    ("http.request.header.host", "/http/host"),
782    (semconv::attribute::HTTP_METHOD, "/http/method"),
783    (semconv::attribute::HTTP_REQUEST_METHOD, "/http/method"),
784    (semconv::attribute::HTTP_TARGET, "/http/path"),
785    (semconv::attribute::URL_PATH, "/http/path"),
786    (semconv::attribute::HTTP_URL, "/http/url"),
787    (semconv::attribute::URL_FULL, "/http/url"),
788    (semconv::attribute::HTTP_USER_AGENT, "/http/user_agent"),
789    (semconv::attribute::USER_AGENT_ORIGINAL, "/http/user_agent"),
790    (semconv::attribute::HTTP_STATUS_CODE, "/http/status_code"),
791    // https://cloud.google.com/trace/docs/trace-labels#canonical-gke
792    (
793        semconv::attribute::HTTP_RESPONSE_STATUS_CODE,
794        "/http/status_code",
795    ),
796    (
797        semconv::attribute::K8S_CLUSTER_NAME,
798        "g.co/r/k8s_container/cluster_name",
799    ),
800    (
801        semconv::attribute::K8S_NAMESPACE_NAME,
802        "g.co/r/k8s_container/namespace",
803    ),
804    (
805        semconv::attribute::K8S_POD_NAME,
806        "g.co/r/k8s_container/pod_name",
807    ),
808    (
809        semconv::attribute::K8S_CONTAINER_NAME,
810        "g.co/r/k8s_container/container_name",
811    ),
812    (semconv::trace::HTTP_ROUTE, "/http/route"),
813    (HTTP_PATH, GCP_HTTP_PATH),
814];
815
816const HTTP_PATH: &str = "http.path";
817const GCP_HTTP_PATH: &str = "/http/path";
818
819impl From<opentelemetry::trace::SpanKind> for SpanKind {
820    fn from(span_kind: opentelemetry::trace::SpanKind) -> Self {
821        match span_kind {
822            opentelemetry::trace::SpanKind::Client => SpanKind::Client,
823            opentelemetry::trace::SpanKind::Server => SpanKind::Server,
824            opentelemetry::trace::SpanKind::Producer => SpanKind::Producer,
825            opentelemetry::trace::SpanKind::Consumer => SpanKind::Consumer,
826            opentelemetry::trace::SpanKind::Internal => SpanKind::Internal,
827        }
828    }
829}
830
831fn status(value: opentelemetry::trace::Status) -> Option<Status> {
832    match value {
833        opentelemetry::trace::Status::Ok => Some(Status {
834            code: Code::Ok as i32,
835            message: "".to_owned(),
836            details: vec![],
837        }),
838        opentelemetry::trace::Status::Unset => None,
839        opentelemetry::trace::Status::Error { description } => Some(Status {
840            code: Code::Unknown as i32,
841            message: description.into(),
842            details: vec![],
843        }),
844    }
845}
846const TRACE_APPEND: &str = "https://www.googleapis.com/auth/trace.append";
847const LOGGING_WRITE: &str = "https://www.googleapis.com/auth/logging.write";
848const MAX_ATTRIBUTES_PER_SPAN: usize = 32;
849
850#[cfg(test)]
851mod tests {
852    use super::*;
853    use opentelemetry::{KeyValue, Value};
854    use opentelemetry_semantic_conventions as semcov;
855
856    #[test]
857    fn test_attributes_mapping() {
858        let capacity = 10;
859        let mut attributes = Vec::with_capacity(capacity);
860
861        //	hostAttribute       = "http.host"
862        attributes.push(KeyValue::new(
863            semconv::attribute::HTTP_HOST,
864            "example.com:8080",
865        ));
866
867        // 	methodAttribute     = "http.method"
868        attributes.push(KeyValue::new(semcov::attribute::HTTP_METHOD, "POST"));
869
870        // 	pathAttribute       = "http.path"
871        attributes.push(KeyValue::new(HTTP_PATH, "/path/12314/?q=ddds#123"));
872
873        // 	urlAttribute        = "http.url"
874        attributes.push(KeyValue::new(
875            semcov::attribute::HTTP_URL,
876            "https://example.com:8080/webshop/articles/4?s=1",
877        ));
878
879        // 	userAgentAttribute  = "http.user_agent"
880        attributes.push(KeyValue::new(
881            semconv::attribute::HTTP_USER_AGENT,
882            "CERN-LineMode/2.15 libwww/2.17b3",
883        ));
884
885        // 	statusCodeAttribute = "http.status_code"
886        attributes.push(KeyValue::new(semcov::attribute::HTTP_STATUS_CODE, 200i64));
887
888        // 	statusCodeAttribute = "http.route"
889        attributes.push(KeyValue::new(
890            semcov::trace::HTTP_ROUTE,
891            "/webshop/articles/:article_id",
892        ));
893
894        // 	serviceAttribute    = "service.name"
895        let resources = Resource::builder_empty()
896            .with_attributes([KeyValue::new(
897                semcov::resource::SERVICE_NAME,
898                "Test Service Name",
899            )])
900            .build();
901
902        let actual = Attributes::new(attributes, Some(&resources));
903        assert_eq!(actual.attribute_map.len(), 8);
904        assert_eq!(actual.dropped_attributes_count, 0);
905        assert_eq!(
906            actual.attribute_map.get("/http/host"),
907            Some(&AttributeValue::from(Value::String(
908                "example.com:8080".into()
909            )))
910        );
911        assert_eq!(
912            actual.attribute_map.get("/http/method"),
913            Some(&AttributeValue::from(Value::String("POST".into()))),
914        );
915        assert_eq!(
916            actual.attribute_map.get("/http/path"),
917            Some(&AttributeValue::from(Value::String(
918                "/path/12314/?q=ddds#123".into()
919            ))),
920        );
921        assert_eq!(
922            actual.attribute_map.get("/http/route"),
923            Some(&AttributeValue::from(Value::String(
924                "/webshop/articles/:article_id".into()
925            ))),
926        );
927        assert_eq!(
928            actual.attribute_map.get("/http/url"),
929            Some(&AttributeValue::from(Value::String(
930                "https://example.com:8080/webshop/articles/4?s=1".into(),
931            ))),
932        );
933        assert_eq!(
934            actual.attribute_map.get("/http/user_agent"),
935            Some(&AttributeValue::from(Value::String(
936                "CERN-LineMode/2.15 libwww/2.17b3".into()
937            ))),
938        );
939        assert_eq!(
940            actual.attribute_map.get("/http/status_code"),
941            Some(&AttributeValue::from(Value::I64(200))),
942        );
943    }
944
945    #[test]
946    fn test_too_many() {
947        let resources = Resource::builder_empty()
948            .with_attributes([KeyValue::new(
949                semconv::attribute::USER_AGENT_ORIGINAL,
950                "Test Service Name UA",
951            )])
952            .build();
953        let mut attributes = Vec::with_capacity(32);
954        for i in 0..32 {
955            attributes.push(KeyValue::new(
956                format!("key{}", i),
957                Value::String(format!("value{}", i).into()),
958            ));
959        }
960
961        let actual = Attributes::new(attributes, Some(&resources));
962        assert_eq!(actual.attribute_map.len(), 32);
963        assert_eq!(actual.dropped_attributes_count, 1);
964        assert_eq!(
965            actual.attribute_map.get("/http/user_agent"),
966            Some(&AttributeValue::from(Value::String(
967                "Test Service Name UA".into()
968            ))),
969        );
970    }
971
972    #[test]
973    fn test_attributes_mapping_http_target() {
974        let attributes = vec![KeyValue::new(
975            semcov::attribute::HTTP_TARGET,
976            "/path/12314/?q=ddds#123",
977        )];
978
979        //	hostAttribute       = "http.target"
980
981        let resources = Resource::builder_empty().with_attributes([]).build();
982        let actual = Attributes::new(attributes, Some(&resources));
983        assert_eq!(actual.attribute_map.len(), 1);
984        assert_eq!(actual.dropped_attributes_count, 0);
985        assert_eq!(
986            actual.attribute_map.get("/http/path"),
987            Some(&AttributeValue::from(Value::String(
988                "/path/12314/?q=ddds#123".into()
989            ))),
990        );
991    }
992
993    #[test]
994    fn test_attributes_mapping_dropped_attributes_count() {
995        let attributes = vec![KeyValue::new("answer", Value::I64(42)),KeyValue::new("long_attribute_key_dvwmacxpeefbuemoxljmqvldjxmvvihoeqnuqdsyovwgljtnemouidabhkmvsnauwfnaihekcfwhugejboiyfthyhmkpsaxtidlsbwsmirebax", Value::String("Some value".into()))];
996
997        let resources = Resource::builder_empty().with_attributes([]).build();
998        let actual = Attributes::new(attributes, Some(&resources));
999        assert_eq!(
1000            actual,
1001            Attributes {
1002                attribute_map: HashMap::from([(
1003                    "answer".into(),
1004                    AttributeValue::from(Value::I64(42))
1005                ),]),
1006                dropped_attributes_count: 1,
1007            }
1008        );
1009        assert_eq!(actual.attribute_map.len(), 1);
1010        assert_eq!(actual.dropped_attributes_count, 1);
1011    }
1012}