opentelemetry_stackdriver/
lib.rs

1#![cfg(not(doctest))]
2// unfortunately the proto code includes comments from the google proto files
3// that are interpreted as "doc tests" and will fail to build.
4// When this PR is merged we should be able to remove this attribute:
5// https://github.com/danburkert/prost/pull/291
6#![allow(
7    clippy::doc_lazy_continuation,
8    deprecated,
9    rustdoc::bare_urls,
10    rustdoc::broken_intra_doc_links,
11    rustdoc::invalid_rust_codeblocks
12)]
13
14use std::{
15    borrow::Cow,
16    collections::HashMap,
17    fmt,
18    future::Future,
19    sync::{
20        atomic::{AtomicUsize, Ordering},
21        Arc, RwLock,
22    },
23    time::{Duration, Instant},
24};
25
26use futures_util::stream::StreamExt;
27use opentelemetry::{otel_error, trace::SpanId, Key, KeyValue, Value};
28use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
29use opentelemetry_sdk::{
30    trace::{SpanData, SpanExporter},
31    Resource,
32};
33use opentelemetry_semantic_conventions as semconv;
34use thiserror::Error;
35#[cfg(feature = "gcp-authorizer")]
36use tonic::metadata::MetadataValue;
37use tonic::{
38    transport::{Channel, ClientTlsConfig},
39    Code, Request,
40};
41
42#[allow(clippy::derive_partial_eq_without_eq)] // tonic doesn't derive Eq for generated types
43pub mod proto;
44
45#[cfg(feature = "propagator")]
46pub mod google_trace_context_propagator;
47
48use proto::devtools::cloudtrace::v2::span::time_event::Annotation;
49use proto::devtools::cloudtrace::v2::span::{
50    Attributes, Link, Links, SpanKind, TimeEvent, TimeEvents,
51};
52use proto::devtools::cloudtrace::v2::trace_service_client::TraceServiceClient;
53use proto::devtools::cloudtrace::v2::{
54    AttributeValue, BatchWriteSpansRequest, Span, TruncatableString,
55};
56use proto::logging::v2::{
57    log_entry::Payload, logging_service_v2_client::LoggingServiceV2Client, LogEntry,
58    LogEntrySourceLocation, WriteLogEntriesRequest,
59};
60use proto::rpc::Status;
61
62/// Exports opentelemetry tracing spans to Google StackDriver.
63///
64/// As of the time of this writing, the opentelemetry crate exposes no link information
65/// so this struct does not send link information.
66#[derive(Clone)]
67pub struct StackDriverExporter {
68    tx: futures_channel::mpsc::Sender<Vec<SpanData>>,
69    pending_count: Arc<AtomicUsize>,
70    maximum_shutdown_duration: Duration,
71    resource: Arc<RwLock<Option<Resource>>>,
72}
73
74impl StackDriverExporter {
75    pub fn builder() -> Builder {
76        Builder::default()
77    }
78
79    pub fn pending_count(&self) -> usize {
80        self.pending_count.load(Ordering::Relaxed)
81    }
82}
83
84impl SpanExporter for StackDriverExporter {
85    async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
86        match self.tx.clone().try_send(batch) {
87            Err(e) => Err(OTelSdkError::InternalFailure(format!("{:?}", e))),
88            Ok(()) => {
89                self.pending_count.fetch_add(1, Ordering::Relaxed);
90                Ok(())
91            }
92        }
93    }
94
95    fn shutdown(&mut self) -> OTelSdkResult {
96        let start = Instant::now();
97        while (Instant::now() - start) < self.maximum_shutdown_duration && self.pending_count() > 0
98        {
99            std::thread::yield_now();
100            // Spin for a bit and give the inner export some time to upload, with a timeout.
101        }
102        Ok(())
103    }
104
105    fn set_resource(&mut self, resource: &Resource) {
106        match self.resource.write() {
107            Ok(mut guard) => *guard = Some(resource.clone()),
108            Err(poisoned) => *poisoned.into_inner() = Some(resource.clone()),
109        }
110    }
111}
112
113impl fmt::Debug for StackDriverExporter {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        #[allow(clippy::unneeded_field_pattern)]
116        let Self {
117            tx: _,
118            pending_count,
119            maximum_shutdown_duration,
120            resource: _,
121        } = self;
122        f.debug_struct("StackDriverExporter")
123            .field("tx", &"(elided)")
124            .field("pending_count", pending_count)
125            .field("maximum_shutdown_duration", maximum_shutdown_duration)
126            .finish()
127    }
128}
129
130/// Helper type to build a `StackDriverExporter`.
131#[derive(Clone, Default)]
132pub struct Builder {
133    maximum_shutdown_duration: Option<Duration>,
134    num_concurrent_requests: Option<usize>,
135    log_context: Option<LogContext>,
136}
137
138impl Builder {
139    /// Set the maximum shutdown duration to export all the remaining data.
140    ///
141    /// If not set, defaults to 5 seconds.
142    pub fn maximum_shutdown_duration(mut self, duration: Duration) -> Self {
143        self.maximum_shutdown_duration = Some(duration);
144        self
145    }
146
147    /// Set the number of concurrent requests.
148    ///
149    /// If `num_concurrent_requests` is set to `0` or `None` then no limit is enforced.
150    pub fn num_concurrent_requests(mut self, num_concurrent_requests: usize) -> Self {
151        self.num_concurrent_requests = Some(num_concurrent_requests);
152        self
153    }
154
155    /// Enable writing log entries with the given `log_context`.
156    pub fn log_context(mut self, log_context: LogContext) -> Self {
157        self.log_context = Some(log_context);
158        self
159    }
160
161    pub async fn build<A: Authorizer>(
162        self,
163        authenticator: A,
164    ) -> Result<(StackDriverExporter, impl Future<Output = ()>), Error>
165    where
166        Error: From<A::Error>,
167    {
168        let Self {
169            maximum_shutdown_duration,
170            num_concurrent_requests,
171            log_context,
172        } = self;
173        let uri = http::uri::Uri::from_static("https://cloudtrace.googleapis.com:443");
174
175        #[cfg(all(feature = "tls-native-roots", not(feature = "tls-webpki-roots")))]
176        let tls_config = ClientTlsConfig::new().with_native_roots();
177        #[cfg(feature = "tls-webpki-roots")]
178        let tls_config = ClientTlsConfig::new().with_webpki_roots();
179        #[cfg(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")))]
180        let tls_config = ClientTlsConfig::new();
181
182        let trace_channel = Channel::builder(uri)
183            .tls_config(tls_config.clone())
184            .map_err(|e| Error::Transport(e.into()))?
185            .connect()
186            .await
187            .map_err(|e| Error::Transport(e.into()))?;
188
189        let log_client = match log_context {
190            Some(log_context) => {
191                let log_channel = Channel::builder(http::uri::Uri::from_static(
192                    "https://logging.googleapis.com:443",
193                ))
194                .tls_config(tls_config)
195                .map_err(|e| Error::Transport(e.into()))?
196                .connect()
197                .await
198                .map_err(|e| Error::Transport(e.into()))?;
199
200                Some(LogClient {
201                    client: LoggingServiceV2Client::new(log_channel),
202                    context: Arc::new(InternalLogContext::from(log_context)),
203                })
204            }
205            None => None,
206        };
207
208        let (tx, rx) = futures_channel::mpsc::channel(64);
209        let pending_count = Arc::new(AtomicUsize::new(0));
210        let scopes = Arc::new(match log_client {
211            Some(_) => vec![TRACE_APPEND, LOGGING_WRITE],
212            None => vec![TRACE_APPEND],
213        });
214
215        let count_clone = pending_count.clone();
216        let resource = Arc::new(RwLock::new(None));
217        let ctx_resource = resource.clone();
218        let future = async move {
219            let trace_client = TraceServiceClient::new(trace_channel);
220            let authorizer = &authenticator;
221            let log_client = log_client.clone();
222            rx.for_each_concurrent(num_concurrent_requests, move |batch| {
223                let trace_client = trace_client.clone();
224                let log_client = log_client.clone();
225                let pending_count = count_clone.clone();
226                let scopes = scopes.clone();
227                let resource = ctx_resource.clone();
228                ExporterContext {
229                    trace_client,
230                    log_client,
231                    authorizer,
232                    pending_count,
233                    scopes,
234                    resource,
235                }
236                .export(batch)
237            })
238            .await
239        };
240
241        let exporter = StackDriverExporter {
242            tx,
243            pending_count,
244            maximum_shutdown_duration: maximum_shutdown_duration
245                .unwrap_or_else(|| Duration::from_secs(5)),
246            resource,
247        };
248
249        Ok((exporter, future))
250    }
251}
252
253struct ExporterContext<'a, A> {
254    trace_client: TraceServiceClient<Channel>,
255    log_client: Option<LogClient>,
256    authorizer: &'a A,
257    pending_count: Arc<AtomicUsize>,
258    scopes: Arc<Vec<&'static str>>,
259    resource: Arc<RwLock<Option<Resource>>>,
260}
261
262impl<A: Authorizer> ExporterContext<'_, A>
263where
264    Error: From<A::Error>,
265{
266    async fn export(mut self, batch: Vec<SpanData>) {
267        use proto::devtools::cloudtrace::v2::span::time_event::Value;
268
269        let mut entries = Vec::new();
270        let mut spans = Vec::with_capacity(batch.len());
271        for span in batch {
272            let trace_id = hex::encode(span.span_context.trace_id().to_bytes());
273            let span_id = hex::encode(span.span_context.span_id().to_bytes());
274            let time_event = match &self.log_client {
275                None => span
276                    .events
277                    .into_iter()
278                    .map(|event| TimeEvent {
279                        time: Some(event.timestamp.into()),
280                        value: Some(Value::Annotation(Annotation {
281                            description: Some(to_truncate(event.name.into_owned())),
282                            ..Default::default()
283                        })),
284                    })
285                    .collect(),
286                Some(client) => {
287                    entries.extend(span.events.into_iter().map(|event| {
288                        let (mut level, mut target, mut labels) =
289                            (LogSeverity::Default, None, HashMap::default());
290                        for kv in event.attributes {
291                            match kv.key.as_str() {
292                                "level" => {
293                                    level = match kv.value.as_str().as_ref() {
294                                        "DEBUG" | "TRACE" => LogSeverity::Debug,
295                                        "INFO" => LogSeverity::Info,
296                                        "WARN" => LogSeverity::Warning,
297                                        "ERROR" => LogSeverity::Error,
298                                        _ => LogSeverity::Default, // tracing::Level is limited to the above 5
299                                    }
300                                }
301                                "target" => target = Some(kv.value.as_str().into_owned()),
302                                key => {
303                                    labels.insert(key.to_owned(), kv.value.as_str().into_owned());
304                                }
305                            }
306                        }
307                        let project_id = self.authorizer.project_id();
308                        let log_id = &client.context.log_id;
309                        LogEntry {
310                            log_name: format!("projects/{project_id}/logs/{log_id}"),
311                            resource: Some(client.context.resource.clone()),
312                            severity: level as i32,
313                            timestamp: Some(event.timestamp.into()),
314                            labels,
315                            trace: format!("projects/{project_id}/traces/{trace_id}"),
316                            span_id: span_id.clone(),
317                            source_location: target.map(|target| LogEntrySourceLocation {
318                                file: String::new(),
319                                line: 0,
320                                function: target,
321                            }),
322                            payload: Some(Payload::TextPayload(event.name.into_owned())),
323                            // severity, source_location, text_payload
324                            ..Default::default()
325                        }
326                    }));
327
328                    vec![]
329                }
330            };
331
332            let resource = self.resource.read().ok();
333            let attributes = match resource {
334                Some(resource) => Attributes::new(span.attributes, resource.as_ref()),
335                None => Attributes::new(span.attributes, None),
336            };
337
338            spans.push(Span {
339                name: format!(
340                    "projects/{}/traces/{}/spans/{}",
341                    self.authorizer.project_id(),
342                    hex::encode(span.span_context.trace_id().to_bytes()),
343                    hex::encode(span.span_context.span_id().to_bytes())
344                ),
345                display_name: Some(to_truncate(span.name.into_owned())),
346                span_id: hex::encode(span.span_context.span_id().to_bytes()),
347                // From the API docs: If this is a root span,
348                // then this field must be empty.
349                parent_span_id: match span.parent_span_id {
350                    SpanId::INVALID => "".to_owned(),
351                    _ => hex::encode(span.parent_span_id.to_bytes()),
352                },
353                start_time: Some(span.start_time.into()),
354                end_time: Some(span.end_time.into()),
355                attributes: Some(attributes),
356                time_events: Some(TimeEvents {
357                    time_event,
358                    ..Default::default()
359                }),
360                links: transform_links(&span.links),
361                status: status(span.status),
362                span_kind: SpanKind::from(span.span_kind) as i32,
363                ..Default::default()
364            });
365        }
366
367        let mut req = Request::new(BatchWriteSpansRequest {
368            name: format!("projects/{}", self.authorizer.project_id()),
369            spans,
370        });
371
372        self.pending_count.fetch_sub(1, Ordering::Relaxed);
373        if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
374            otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", e));
375        } else if let Err(e) = self.trace_client.batch_write_spans(req).await {
376            otel_error!(name: "ExportTransportError", error = format!("{:?}", e));
377        }
378
379        let client = match &mut self.log_client {
380            Some(client) => client,
381            None => return,
382        };
383
384        let mut req = Request::new(WriteLogEntriesRequest {
385            log_name: format!(
386                "projects/{}/logs/{}",
387                self.authorizer.project_id(),
388                client.context.log_id,
389            ),
390            entries,
391            dry_run: false,
392            labels: HashMap::default(),
393            partial_success: true,
394            resource: None,
395        });
396
397        if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
398            otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", e));
399        } else if let Err(e) = client.client.write_log_entries(req).await {
400            otel_error!(name: "ExportTransportError", error =  format!("{:?}", e));
401        }
402    }
403}
404
405#[cfg(feature = "gcp-authorizer")]
406pub struct GcpAuthorizer {
407    provider: Arc<dyn gcp_auth::TokenProvider>,
408    project_id: Arc<str>,
409}
410
411#[cfg(feature = "gcp-authorizer")]
412impl GcpAuthorizer {
413    pub async fn new() -> Result<Self, Error> {
414        let provider = gcp_auth::provider()
415            .await
416            .map_err(|e| Error::Authorizer(e.into()))?;
417
418        let project_id = provider
419            .project_id()
420            .await
421            .map_err(|e| Error::Authorizer(e.into()))?;
422
423        Ok(Self {
424            provider,
425            project_id,
426        })
427    }
428    pub fn from_gcp_auth(provider: Arc<dyn gcp_auth::TokenProvider>, project_id: Arc<str>) -> Self {
429        Self {
430            provider,
431            project_id,
432        }
433    }
434}
435
436#[cfg(feature = "gcp-authorizer")]
437impl Authorizer for GcpAuthorizer {
438    type Error = Error;
439
440    fn project_id(&self) -> &str {
441        &self.project_id
442    }
443
444    async fn authorize<T: Send + Sync>(
445        &self,
446        req: &mut Request<T>,
447        scopes: &[&str],
448    ) -> Result<(), Self::Error> {
449        let token = self
450            .provider
451            .token(scopes)
452            .await
453            .map_err(|e| Error::Authorizer(e.into()))?;
454
455        req.metadata_mut().insert(
456            "authorization",
457            MetadataValue::try_from(format!("Bearer {}", token.as_str())).unwrap(),
458        );
459
460        Ok(())
461    }
462}
463
464pub trait Authorizer: Sync + Send + 'static {
465    type Error: std::error::Error + fmt::Debug + Send + Sync;
466
467    fn project_id(&self) -> &str;
468    fn authorize<T: Send + Sync>(
469        &self,
470        request: &mut Request<T>,
471        scopes: &[&str],
472    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
473}
474
475impl From<Value> for AttributeValue {
476    fn from(v: Value) -> AttributeValue {
477        use proto::devtools::cloudtrace::v2::attribute_value;
478        let new_value = match v {
479            Value::Bool(v) => attribute_value::Value::BoolValue(v),
480            Value::F64(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
481            Value::I64(v) => attribute_value::Value::IntValue(v),
482            Value::String(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
483            Value::Array(_) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
484            _ => attribute_value::Value::StringValue(to_truncate("".to_string())),
485        };
486        AttributeValue {
487            value: Some(new_value),
488        }
489    }
490}
491
492fn to_truncate(s: String) -> TruncatableString {
493    TruncatableString {
494        value: s,
495        ..Default::default()
496    }
497}
498
499#[derive(Debug, Error)]
500pub enum Error {
501    #[error("authorizer error: {0}")]
502    Authorizer(#[source] Box<dyn std::error::Error + Send + Sync>),
503    #[error("I/O error: {0}")]
504    Io(#[from] std::io::Error),
505    #[error("{0}")]
506    Other(#[from] Box<dyn std::error::Error + Send + Sync>),
507    #[error("tonic error: {0}")]
508    Transport(#[source] Box<dyn std::error::Error + Send + Sync>),
509}
510
511impl opentelemetry_sdk::ExportError for Error {
512    fn exporter_name(&self) -> &'static str {
513        "stackdriver"
514    }
515}
516
517/// As defined in https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.type#google.logging.type.LogSeverity.
518enum LogSeverity {
519    Default = 0,
520    Debug = 100,
521    Info = 200,
522    Warning = 400,
523    Error = 500,
524}
525
526#[derive(Clone)]
527struct LogClient {
528    client: LoggingServiceV2Client<Channel>,
529    context: Arc<InternalLogContext>,
530}
531
532struct InternalLogContext {
533    log_id: String,
534    resource: proto::api::MonitoredResource,
535}
536
537#[derive(Clone)]
538pub struct LogContext {
539    pub log_id: String,
540    pub resource: MonitoredResource,
541}
542
543impl From<LogContext> for InternalLogContext {
544    fn from(cx: LogContext) -> Self {
545        let mut labels = HashMap::default();
546        let resource = match cx.resource {
547            MonitoredResource::AppEngine {
548                project_id,
549                module_id,
550                version_id,
551                zone,
552            } => {
553                labels.insert("project_id".to_string(), project_id);
554                if let Some(module_id) = module_id {
555                    labels.insert("module_id".to_string(), module_id);
556                }
557                if let Some(version_id) = version_id {
558                    labels.insert("version_id".to_string(), version_id);
559                }
560                if let Some(zone) = zone {
561                    labels.insert("zone".to_string(), zone);
562                }
563
564                proto::api::MonitoredResource {
565                    r#type: "gae_app".to_owned(),
566                    labels,
567                }
568            }
569            MonitoredResource::CloudFunction {
570                project_id,
571                function_name,
572                region,
573            } => {
574                labels.insert("project_id".to_string(), project_id);
575                if let Some(function_name) = function_name {
576                    labels.insert("function_name".to_string(), function_name);
577                }
578                if let Some(region) = region {
579                    labels.insert("region".to_string(), region);
580                }
581
582                proto::api::MonitoredResource {
583                    r#type: "cloud_function".to_owned(),
584                    labels,
585                }
586            }
587            MonitoredResource::CloudRunJob {
588                project_id,
589                job_name,
590                location,
591            } => {
592                labels.insert("project_id".to_string(), project_id);
593                if let Some(job_name) = job_name {
594                    labels.insert("job_name".to_string(), job_name);
595                }
596                if let Some(location) = location {
597                    labels.insert("location".to_string(), location);
598                }
599
600                proto::api::MonitoredResource {
601                    r#type: "cloud_run_job".to_owned(),
602                    labels,
603                }
604            }
605            MonitoredResource::CloudRunRevision {
606                project_id,
607                service_name,
608                revision_name,
609                location,
610                configuration_name,
611            } => {
612                labels.insert("project_id".to_string(), project_id);
613                if let Some(service_name) = service_name {
614                    labels.insert("service_name".to_string(), service_name);
615                }
616                if let Some(revision_name) = revision_name {
617                    labels.insert("revision_name".to_string(), revision_name);
618                }
619                if let Some(location) = location {
620                    labels.insert("location".to_string(), location);
621                }
622                if let Some(configuration_name) = configuration_name {
623                    labels.insert("configuration_name".to_string(), configuration_name);
624                }
625
626                proto::api::MonitoredResource {
627                    r#type: "cloud_run_revision".to_owned(),
628                    labels,
629                }
630            }
631
632            MonitoredResource::ComputeEngine {
633                project_id,
634                instance_id,
635                zone,
636            } => {
637                labels.insert("project_id".to_string(), project_id);
638                if let Some(instance_id) = instance_id {
639                    labels.insert("instance_id".to_string(), instance_id);
640                }
641                if let Some(zone) = zone {
642                    labels.insert("zone".to_string(), zone);
643                }
644
645                proto::api::MonitoredResource {
646                    r#type: "gce_instance".to_owned(),
647                    labels,
648                }
649            }
650
651            MonitoredResource::GenericNode {
652                project_id,
653                location,
654                namespace,
655                node_id,
656            } => {
657                labels.insert("project_id".to_string(), project_id);
658                if let Some(location) = location {
659                    labels.insert("location".to_string(), location);
660                }
661                if let Some(namespace) = namespace {
662                    labels.insert("namespace".to_string(), namespace);
663                }
664                if let Some(node_id) = node_id {
665                    labels.insert("node_id".to_string(), node_id);
666                }
667
668                proto::api::MonitoredResource {
669                    r#type: "generic_node".to_owned(),
670                    labels,
671                }
672            }
673            MonitoredResource::GenericTask {
674                project_id,
675                location,
676                namespace,
677                job,
678                task_id,
679            } => {
680                labels.insert("project_id".to_owned(), project_id);
681                if let Some(location) = location {
682                    labels.insert("location".to_owned(), location);
683                }
684                if let Some(namespace) = namespace {
685                    labels.insert("namespace".to_owned(), namespace);
686                }
687                if let Some(job) = job {
688                    labels.insert("job".to_owned(), job);
689                }
690                if let Some(task_id) = task_id {
691                    labels.insert("task_id".to_owned(), task_id);
692                }
693
694                proto::api::MonitoredResource {
695                    r#type: "generic_task".to_owned(),
696                    labels,
697                }
698            }
699            MonitoredResource::Global { project_id } => {
700                labels.insert("project_id".to_owned(), project_id);
701                proto::api::MonitoredResource {
702                    r#type: "global".to_owned(),
703                    labels,
704                }
705            }
706            MonitoredResource::KubernetesEngine {
707                project_id,
708                cluster_name,
709                location,
710                pod_name,
711                namespace_name,
712                container_name,
713            } => {
714                labels.insert("project_id".to_string(), project_id);
715                if let Some(cluster_name) = cluster_name {
716                    labels.insert("cluster_name".to_string(), cluster_name);
717                }
718                if let Some(location) = location {
719                    labels.insert("location".to_string(), location);
720                }
721                if let Some(pod_name) = pod_name {
722                    labels.insert("pod_name".to_string(), pod_name);
723                }
724                if let Some(namespace_name) = namespace_name {
725                    labels.insert("namespace_name".to_string(), namespace_name);
726                }
727                if let Some(container_name) = container_name {
728                    labels.insert("container_name".to_string(), container_name);
729                }
730
731                proto::api::MonitoredResource {
732                    r#type: "k8s_container".to_owned(),
733                    labels,
734                }
735            }
736        };
737
738        Self {
739            log_id: cx.log_id,
740            resource,
741        }
742    }
743}
744
745/// A description of a `MonitoredResource`.
746///
747/// Possible values are listed in the [API documentation](https://cloud.google.com/logging/docs/api/v2/resource-list).
748/// Please submit an issue or pull request if you want to use a resource type not listed here.
749#[derive(Clone)]
750pub enum MonitoredResource {
751    AppEngine {
752        project_id: String,
753        module_id: Option<String>,
754        version_id: Option<String>,
755        zone: Option<String>,
756    },
757    CloudFunction {
758        project_id: String,
759        function_name: Option<String>,
760        region: Option<String>,
761    },
762    CloudRunJob {
763        project_id: String,
764        job_name: Option<String>,
765        location: Option<String>,
766    },
767    CloudRunRevision {
768        project_id: String,
769        service_name: Option<String>,
770        revision_name: Option<String>,
771        location: Option<String>,
772        configuration_name: Option<String>,
773    },
774    ComputeEngine {
775        project_id: String,
776        instance_id: Option<String>,
777        zone: Option<String>,
778    },
779    KubernetesEngine {
780        project_id: String,
781        location: Option<String>,
782        cluster_name: Option<String>,
783        namespace_name: Option<String>,
784        pod_name: Option<String>,
785        container_name: Option<String>,
786    },
787    GenericNode {
788        project_id: String,
789        location: Option<String>,
790        namespace: Option<String>,
791        node_id: Option<String>,
792    },
793    GenericTask {
794        project_id: String,
795        location: Option<String>,
796        namespace: Option<String>,
797        job: Option<String>,
798        task_id: Option<String>,
799    },
800    Global {
801        project_id: String,
802    },
803}
804
805impl Attributes {
806    /// Combines `EvictedHashMap` and `Resource` attributes into a maximum of 32.
807    ///
808    /// The `Resource` takes precedence over the `EvictedHashMap` attributes.
809    fn new(attributes: Vec<KeyValue>, resource: Option<&Resource>) -> Self {
810        let mut new = Self {
811            dropped_attributes_count: 0,
812            attribute_map: HashMap::with_capacity(Ord::min(
813                MAX_ATTRIBUTES_PER_SPAN,
814                attributes.len() + resource.map_or(0, |r| r.len()),
815            )),
816        };
817
818        if let Some(resource) = resource {
819            for (k, v) in resource.iter() {
820                new.push(Cow::Borrowed(k), Cow::Borrowed(v));
821            }
822        }
823
824        for kv in attributes {
825            new.push(Cow::Owned(kv.key), Cow::Owned(kv.value));
826        }
827
828        new
829    }
830
831    fn push(&mut self, key: Cow<'_, Key>, value: Cow<'_, Value>) {
832        if self.attribute_map.len() >= MAX_ATTRIBUTES_PER_SPAN {
833            self.dropped_attributes_count += 1;
834            return;
835        }
836
837        let key_str = key.as_str();
838        if key_str.len() > 128 {
839            self.dropped_attributes_count += 1;
840            return;
841        }
842
843        for (otel_key, gcp_key) in KEY_MAP {
844            if otel_key == key_str {
845                self.attribute_map
846                    .insert(gcp_key.to_owned(), value.into_owned().into());
847                return;
848            }
849        }
850
851        self.attribute_map.insert(
852            match key {
853                Cow::Owned(k) => k.to_string(),
854                Cow::Borrowed(k) => k.to_string(),
855            },
856            value.into_owned().into(),
857        );
858    }
859}
860
861fn transform_links(links: &opentelemetry_sdk::trace::SpanLinks) -> Option<Links> {
862    if links.is_empty() {
863        return None;
864    }
865
866    Some(Links {
867        dropped_links_count: links.dropped_count as i32,
868        link: links
869            .iter()
870            .map(|link| Link {
871                trace_id: hex::encode(link.span_context.trace_id().to_bytes()),
872                span_id: hex::encode(link.span_context.span_id().to_bytes()),
873                ..Default::default()
874            })
875            .collect(),
876    })
877}
878
879// Map conventional OpenTelemetry keys to their GCP counterparts.
880//
881// https://cloud.google.com/trace/docs/trace-labels
882const KEY_MAP: [(&str, &str); 19] = [
883    (HTTP_PATH, GCP_HTTP_PATH),
884    (semconv::attribute::HTTP_HOST, "/http/host"),
885    ("http.request.header.host", "/http/host"),
886    (semconv::attribute::HTTP_METHOD, "/http/method"),
887    (semconv::attribute::HTTP_REQUEST_METHOD, "/http/method"),
888    (semconv::attribute::HTTP_TARGET, "/http/path"),
889    (semconv::attribute::URL_PATH, "/http/path"),
890    (semconv::attribute::HTTP_URL, "/http/url"),
891    (semconv::attribute::URL_FULL, "/http/url"),
892    (semconv::attribute::HTTP_USER_AGENT, "/http/user_agent"),
893    (semconv::attribute::USER_AGENT_ORIGINAL, "/http/user_agent"),
894    (semconv::attribute::HTTP_STATUS_CODE, "/http/status_code"),
895    // https://cloud.google.com/trace/docs/trace-labels#canonical-gke
896    (
897        semconv::attribute::HTTP_RESPONSE_STATUS_CODE,
898        "/http/status_code",
899    ),
900    (
901        semconv::attribute::K8S_CLUSTER_NAME,
902        "g.co/r/k8s_container/cluster_name",
903    ),
904    (
905        semconv::attribute::K8S_NAMESPACE_NAME,
906        "g.co/r/k8s_container/namespace",
907    ),
908    (
909        semconv::attribute::K8S_POD_NAME,
910        "g.co/r/k8s_container/pod_name",
911    ),
912    (
913        semconv::attribute::K8S_CONTAINER_NAME,
914        "g.co/r/k8s_container/container_name",
915    ),
916    (semconv::trace::HTTP_ROUTE, "/http/route"),
917    (HTTP_PATH, GCP_HTTP_PATH),
918];
919
920const HTTP_PATH: &str = "http.path";
921const GCP_HTTP_PATH: &str = "/http/path";
922
923impl From<opentelemetry::trace::SpanKind> for SpanKind {
924    fn from(span_kind: opentelemetry::trace::SpanKind) -> Self {
925        match span_kind {
926            opentelemetry::trace::SpanKind::Client => SpanKind::Client,
927            opentelemetry::trace::SpanKind::Server => SpanKind::Server,
928            opentelemetry::trace::SpanKind::Producer => SpanKind::Producer,
929            opentelemetry::trace::SpanKind::Consumer => SpanKind::Consumer,
930            opentelemetry::trace::SpanKind::Internal => SpanKind::Internal,
931        }
932    }
933}
934
935fn status(value: opentelemetry::trace::Status) -> Option<Status> {
936    match value {
937        opentelemetry::trace::Status::Ok => Some(Status {
938            code: Code::Ok as i32,
939            message: "".to_owned(),
940            details: vec![],
941        }),
942        opentelemetry::trace::Status::Unset => None,
943        opentelemetry::trace::Status::Error { description } => Some(Status {
944            code: Code::Unknown as i32,
945            message: description.into(),
946            details: vec![],
947        }),
948    }
949}
950const TRACE_APPEND: &str = "https://www.googleapis.com/auth/trace.append";
951const LOGGING_WRITE: &str = "https://www.googleapis.com/auth/logging.write";
952const MAX_ATTRIBUTES_PER_SPAN: usize = 32;
953
954#[cfg(test)]
955mod tests {
956    use super::*;
957    use opentelemetry::{KeyValue, Value};
958    use opentelemetry_semantic_conventions as semcov;
959
960    #[test]
961    fn test_attributes_mapping() {
962        let capacity = 10;
963        let mut attributes = Vec::with_capacity(capacity);
964
965        //	hostAttribute       = "http.host"
966        attributes.push(KeyValue::new(
967            semconv::attribute::HTTP_HOST,
968            "example.com:8080",
969        ));
970
971        // 	methodAttribute     = "http.method"
972        attributes.push(KeyValue::new(semcov::attribute::HTTP_METHOD, "POST"));
973
974        // 	pathAttribute       = "http.path"
975        attributes.push(KeyValue::new(HTTP_PATH, "/path/12314/?q=ddds#123"));
976
977        // 	urlAttribute        = "http.url"
978        attributes.push(KeyValue::new(
979            semcov::attribute::HTTP_URL,
980            "https://example.com:8080/webshop/articles/4?s=1",
981        ));
982
983        // 	userAgentAttribute  = "http.user_agent"
984        attributes.push(KeyValue::new(
985            semconv::attribute::HTTP_USER_AGENT,
986            "CERN-LineMode/2.15 libwww/2.17b3",
987        ));
988
989        // 	statusCodeAttribute = "http.status_code"
990        attributes.push(KeyValue::new(semcov::attribute::HTTP_STATUS_CODE, 200i64));
991
992        // 	statusCodeAttribute = "http.route"
993        attributes.push(KeyValue::new(
994            semcov::trace::HTTP_ROUTE,
995            "/webshop/articles/:article_id",
996        ));
997
998        // 	serviceAttribute    = "service.name"
999        let resources = Resource::builder_empty()
1000            .with_attributes([KeyValue::new(
1001                semcov::resource::SERVICE_NAME,
1002                "Test Service Name",
1003            )])
1004            .build();
1005
1006        let actual = Attributes::new(attributes, Some(&resources));
1007        assert_eq!(actual.attribute_map.len(), 8);
1008        assert_eq!(actual.dropped_attributes_count, 0);
1009        assert_eq!(
1010            actual.attribute_map.get("/http/host"),
1011            Some(&AttributeValue::from(Value::String(
1012                "example.com:8080".into()
1013            )))
1014        );
1015        assert_eq!(
1016            actual.attribute_map.get("/http/method"),
1017            Some(&AttributeValue::from(Value::String("POST".into()))),
1018        );
1019        assert_eq!(
1020            actual.attribute_map.get("/http/path"),
1021            Some(&AttributeValue::from(Value::String(
1022                "/path/12314/?q=ddds#123".into()
1023            ))),
1024        );
1025        assert_eq!(
1026            actual.attribute_map.get("/http/route"),
1027            Some(&AttributeValue::from(Value::String(
1028                "/webshop/articles/:article_id".into()
1029            ))),
1030        );
1031        assert_eq!(
1032            actual.attribute_map.get("/http/url"),
1033            Some(&AttributeValue::from(Value::String(
1034                "https://example.com:8080/webshop/articles/4?s=1".into(),
1035            ))),
1036        );
1037        assert_eq!(
1038            actual.attribute_map.get("/http/user_agent"),
1039            Some(&AttributeValue::from(Value::String(
1040                "CERN-LineMode/2.15 libwww/2.17b3".into()
1041            ))),
1042        );
1043        assert_eq!(
1044            actual.attribute_map.get("/http/status_code"),
1045            Some(&AttributeValue::from(Value::I64(200))),
1046        );
1047    }
1048
1049    #[test]
1050    fn test_too_many() {
1051        let resources = Resource::builder_empty()
1052            .with_attributes([KeyValue::new(
1053                semconv::attribute::USER_AGENT_ORIGINAL,
1054                "Test Service Name UA",
1055            )])
1056            .build();
1057        let mut attributes = Vec::with_capacity(32);
1058        for i in 0..32 {
1059            attributes.push(KeyValue::new(
1060                format!("key{}", i),
1061                Value::String(format!("value{}", i).into()),
1062            ));
1063        }
1064
1065        let actual = Attributes::new(attributes, Some(&resources));
1066        assert_eq!(actual.attribute_map.len(), 32);
1067        assert_eq!(actual.dropped_attributes_count, 1);
1068        assert_eq!(
1069            actual.attribute_map.get("/http/user_agent"),
1070            Some(&AttributeValue::from(Value::String(
1071                "Test Service Name UA".into()
1072            ))),
1073        );
1074    }
1075
1076    #[test]
1077    fn test_attributes_mapping_http_target() {
1078        let attributes = vec![KeyValue::new(
1079            semcov::attribute::HTTP_TARGET,
1080            "/path/12314/?q=ddds#123",
1081        )];
1082
1083        //	hostAttribute       = "http.target"
1084
1085        let resources = Resource::builder_empty().with_attributes([]).build();
1086        let actual = Attributes::new(attributes, Some(&resources));
1087        assert_eq!(actual.attribute_map.len(), 1);
1088        assert_eq!(actual.dropped_attributes_count, 0);
1089        assert_eq!(
1090            actual.attribute_map.get("/http/path"),
1091            Some(&AttributeValue::from(Value::String(
1092                "/path/12314/?q=ddds#123".into()
1093            ))),
1094        );
1095    }
1096
1097    #[test]
1098    fn test_attributes_mapping_dropped_attributes_count() {
1099        let attributes = vec![KeyValue::new("answer", Value::I64(42)),KeyValue::new("long_attribute_key_dvwmacxpeefbuemoxljmqvldjxmvvihoeqnuqdsyovwgljtnemouidabhkmvsnauwfnaihekcfwhugejboiyfthyhmkpsaxtidlsbwsmirebax", Value::String("Some value".into()))];
1100
1101        let resources = Resource::builder_empty().with_attributes([]).build();
1102        let actual = Attributes::new(attributes, Some(&resources));
1103        assert_eq!(
1104            actual,
1105            Attributes {
1106                attribute_map: HashMap::from([(
1107                    "answer".into(),
1108                    AttributeValue::from(Value::I64(42))
1109                ),]),
1110                dropped_attributes_count: 1,
1111            }
1112        );
1113        assert_eq!(actual.attribute_map.len(), 1);
1114        assert_eq!(actual.dropped_attributes_count, 1);
1115    }
1116}