opentelemetry_stackdriver/
lib.rs

1#![cfg(not(doctest))]
2// unfortunately the proto code includes comments from the google proto files
3// that are interpreted as "doc tests" and will fail to build.
4// When this PR is merged we should be able to remove this attribute:
5// https://github.com/danburkert/prost/pull/291
6#![allow(
7    clippy::doc_lazy_continuation,
8    deprecated,
9    rustdoc::bare_urls,
10    rustdoc::broken_intra_doc_links,
11    rustdoc::invalid_rust_codeblocks
12)]
13
14use std::{
15    borrow::Cow,
16    collections::HashMap,
17    fmt,
18    future::Future,
19    sync::{
20        atomic::{AtomicUsize, Ordering},
21        Arc, RwLock,
22    },
23    time::{Duration, Instant},
24};
25
26use futures_util::stream::StreamExt;
27use opentelemetry::{otel_error, trace::SpanId, Key, KeyValue, Value};
28use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
29use opentelemetry_sdk::{
30    trace::{SpanData, SpanExporter},
31    Resource,
32};
33use opentelemetry_semantic_conventions as semconv;
34use thiserror::Error;
35#[cfg(feature = "gcp-authorizer")]
36use tonic::metadata::MetadataValue;
37#[cfg(any(
38    feature = "tls-ring",
39    feature = "tls-native-roots",
40    feature = "tls-webpki-roots"
41))]
42use tonic::transport::ClientTlsConfig;
43use tonic::{transport::Channel, Code, Request};
44
45#[allow(clippy::derive_partial_eq_without_eq)] // tonic doesn't derive Eq for generated types
46#[allow(clippy::doc_overindented_list_items)]
47pub mod proto;
48
49#[cfg(feature = "propagator")]
50pub mod google_trace_context_propagator;
51
52use proto::devtools::cloudtrace::v2::span::time_event::Annotation;
53use proto::devtools::cloudtrace::v2::span::{
54    Attributes, Link, Links, SpanKind, TimeEvent, TimeEvents,
55};
56use proto::devtools::cloudtrace::v2::trace_service_client::TraceServiceClient;
57use proto::devtools::cloudtrace::v2::{
58    AttributeValue, BatchWriteSpansRequest, Span, TruncatableString,
59};
60use proto::logging::v2::{
61    log_entry::Payload, logging_service_v2_client::LoggingServiceV2Client, LogEntry,
62    LogEntrySourceLocation, WriteLogEntriesRequest,
63};
64use proto::rpc::Status;
65
66/// Exports opentelemetry tracing spans to Google StackDriver.
67///
68/// As of the time of this writing, the opentelemetry crate exposes no link information
69/// so this struct does not send link information.
70#[derive(Clone)]
71pub struct StackDriverExporter {
72    tx: futures_channel::mpsc::Sender<Vec<SpanData>>,
73    pending_count: Arc<AtomicUsize>,
74    maximum_shutdown_duration: Duration,
75    resource: Arc<RwLock<Option<Resource>>>,
76}
77
78impl StackDriverExporter {
79    pub fn builder() -> Builder {
80        Builder::default()
81    }
82
83    pub fn pending_count(&self) -> usize {
84        self.pending_count.load(Ordering::Relaxed)
85    }
86}
87
88impl SpanExporter for StackDriverExporter {
89    async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
90        match self.tx.clone().try_send(batch) {
91            Err(e) => Err(OTelSdkError::InternalFailure(format!("{:?}", e))),
92            Ok(()) => {
93                self.pending_count.fetch_add(1, Ordering::Relaxed);
94                Ok(())
95            }
96        }
97    }
98
99    fn shutdown(&mut self) -> OTelSdkResult {
100        let start = Instant::now();
101        while (Instant::now() - start) < self.maximum_shutdown_duration && self.pending_count() > 0
102        {
103            std::thread::yield_now();
104            // Spin for a bit and give the inner export some time to upload, with a timeout.
105        }
106        Ok(())
107    }
108
109    fn set_resource(&mut self, resource: &Resource) {
110        match self.resource.write() {
111            Ok(mut guard) => *guard = Some(resource.clone()),
112            Err(poisoned) => *poisoned.into_inner() = Some(resource.clone()),
113        }
114    }
115}
116
117impl fmt::Debug for StackDriverExporter {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        #[allow(clippy::unneeded_field_pattern)]
120        let Self {
121            tx: _,
122            pending_count,
123            maximum_shutdown_duration,
124            resource: _,
125        } = self;
126        f.debug_struct("StackDriverExporter")
127            .field("tx", &"(elided)")
128            .field("pending_count", pending_count)
129            .field("maximum_shutdown_duration", maximum_shutdown_duration)
130            .finish()
131    }
132}
133
134/// Helper type to build a `StackDriverExporter`.
135#[derive(Clone, Default)]
136pub struct Builder {
137    maximum_shutdown_duration: Option<Duration>,
138    num_concurrent_requests: Option<usize>,
139    log_context: Option<LogContext>,
140}
141
142impl Builder {
143    /// Set the maximum shutdown duration to export all the remaining data.
144    ///
145    /// If not set, defaults to 5 seconds.
146    pub fn maximum_shutdown_duration(mut self, duration: Duration) -> Self {
147        self.maximum_shutdown_duration = Some(duration);
148        self
149    }
150
151    /// Set the number of concurrent requests.
152    ///
153    /// If `num_concurrent_requests` is set to `0` or `None` then no limit is enforced.
154    pub fn num_concurrent_requests(mut self, num_concurrent_requests: usize) -> Self {
155        self.num_concurrent_requests = Some(num_concurrent_requests);
156        self
157    }
158
159    /// Enable writing log entries with the given `log_context`.
160    pub fn log_context(mut self, log_context: LogContext) -> Self {
161        self.log_context = Some(log_context);
162        self
163    }
164
165    pub async fn build<A: Authorizer>(
166        self,
167        authenticator: A,
168    ) -> Result<(StackDriverExporter, impl Future<Output = ()>), Error>
169    where
170        Error: From<A::Error>,
171    {
172        let Self {
173            maximum_shutdown_duration,
174            num_concurrent_requests,
175            log_context,
176        } = self;
177        let uri = http::uri::Uri::from_static("https://cloudtrace.googleapis.com:443");
178
179        #[cfg(any(
180            feature = "tls-ring",
181            feature = "tls-native-roots",
182            feature = "tls-webpki-roots"
183        ))]
184        let tls_config = ClientTlsConfig::new().with_enabled_roots();
185
186        let trace_channel_builder = Channel::builder(uri);
187        #[cfg(any(
188            feature = "tls-ring",
189            feature = "tls-native-roots",
190            feature = "tls-webpki-roots"
191        ))]
192        let trace_channel_builder = trace_channel_builder
193            .tls_config(tls_config.clone())
194            .map_err(|e| Error::Transport(e.into()))?;
195
196        let trace_channel = trace_channel_builder
197            .connect()
198            .await
199            .map_err(|e| Error::Transport(e.into()))?;
200
201        let log_client = match log_context {
202            Some(log_context) => {
203                let log_channel_builder = Channel::builder(http::uri::Uri::from_static(
204                    "https://logging.googleapis.com:443",
205                ));
206                #[cfg(any(
207                    feature = "tls-ring",
208                    feature = "tls-native-roots",
209                    feature = "tls-webpki-roots"
210                ))]
211                let log_channel_builder = log_channel_builder
212                    .tls_config(tls_config)
213                    .map_err(|e| Error::Transport(e.into()))?;
214
215                let log_channel = log_channel_builder
216                    .connect()
217                    .await
218                    .map_err(|e| Error::Transport(e.into()))?;
219
220                Some(LogClient {
221                    client: LoggingServiceV2Client::new(log_channel),
222                    context: Arc::new(InternalLogContext::from(log_context)),
223                })
224            }
225            None => None,
226        };
227
228        let (tx, rx) = futures_channel::mpsc::channel(64);
229        let pending_count = Arc::new(AtomicUsize::new(0));
230        let scopes = Arc::new(match log_client {
231            Some(_) => vec![TRACE_APPEND, LOGGING_WRITE],
232            None => vec![TRACE_APPEND],
233        });
234
235        let count_clone = pending_count.clone();
236        let resource = Arc::new(RwLock::new(None));
237        let ctx_resource = resource.clone();
238        let future = async move {
239            let trace_client = TraceServiceClient::new(trace_channel);
240            let authorizer = &authenticator;
241            let log_client = log_client.clone();
242            rx.for_each_concurrent(num_concurrent_requests, move |batch| {
243                let trace_client = trace_client.clone();
244                let log_client = log_client.clone();
245                let pending_count = count_clone.clone();
246                let scopes = scopes.clone();
247                let resource = ctx_resource.clone();
248                ExporterContext {
249                    trace_client,
250                    log_client,
251                    authorizer,
252                    pending_count,
253                    scopes,
254                    resource,
255                }
256                .export(batch)
257            })
258            .await
259        };
260
261        let exporter = StackDriverExporter {
262            tx,
263            pending_count,
264            maximum_shutdown_duration: maximum_shutdown_duration
265                .unwrap_or_else(|| Duration::from_secs(5)),
266            resource,
267        };
268
269        Ok((exporter, future))
270    }
271}
272
273struct ExporterContext<'a, A> {
274    trace_client: TraceServiceClient<Channel>,
275    log_client: Option<LogClient>,
276    authorizer: &'a A,
277    pending_count: Arc<AtomicUsize>,
278    scopes: Arc<Vec<&'static str>>,
279    resource: Arc<RwLock<Option<Resource>>>,
280}
281
282impl<A: Authorizer> ExporterContext<'_, A>
283where
284    Error: From<A::Error>,
285{
286    async fn export(mut self, batch: Vec<SpanData>) {
287        use proto::devtools::cloudtrace::v2::span::time_event::Value;
288
289        let mut entries = Vec::new();
290        let mut spans = Vec::with_capacity(batch.len());
291        for span in batch {
292            let trace_id = hex::encode(span.span_context.trace_id().to_bytes());
293            let span_id = hex::encode(span.span_context.span_id().to_bytes());
294            let time_event = match &self.log_client {
295                None => span
296                    .events
297                    .into_iter()
298                    .map(|event| TimeEvent {
299                        time: Some(event.timestamp.into()),
300                        value: Some(Value::Annotation(Annotation {
301                            description: Some(to_truncate(event.name.into_owned())),
302                            ..Default::default()
303                        })),
304                    })
305                    .collect(),
306                Some(client) => {
307                    entries.extend(span.events.into_iter().map(|event| {
308                        let (mut level, mut target, mut labels) =
309                            (LogSeverity::Default, None, HashMap::default());
310                        for kv in event.attributes {
311                            match kv.key.as_str() {
312                                "level" => {
313                                    level = match kv.value.as_str().as_ref() {
314                                        "DEBUG" | "TRACE" => LogSeverity::Debug,
315                                        "INFO" => LogSeverity::Info,
316                                        "WARN" => LogSeverity::Warning,
317                                        "ERROR" => LogSeverity::Error,
318                                        _ => LogSeverity::Default, // tracing::Level is limited to the above 5
319                                    }
320                                }
321                                "target" => target = Some(kv.value.as_str().into_owned()),
322                                key => {
323                                    labels.insert(key.to_owned(), kv.value.as_str().into_owned());
324                                }
325                            }
326                        }
327                        let project_id = self.authorizer.project_id();
328                        let log_id = &client.context.log_id;
329                        LogEntry {
330                            log_name: format!("projects/{project_id}/logs/{log_id}"),
331                            resource: Some(client.context.resource.clone()),
332                            severity: level as i32,
333                            timestamp: Some(event.timestamp.into()),
334                            labels,
335                            trace: format!("projects/{project_id}/traces/{trace_id}"),
336                            span_id: span_id.clone(),
337                            source_location: target.map(|target| LogEntrySourceLocation {
338                                file: String::new(),
339                                line: 0,
340                                function: target,
341                            }),
342                            payload: Some(Payload::TextPayload(event.name.into_owned())),
343                            // severity, source_location, text_payload
344                            ..Default::default()
345                        }
346                    }));
347
348                    vec![]
349                }
350            };
351
352            let resource = self.resource.read().ok();
353            let attributes = match resource {
354                Some(resource) => Attributes::new(span.attributes, resource.as_ref()),
355                None => Attributes::new(span.attributes, None),
356            };
357
358            spans.push(Span {
359                name: format!(
360                    "projects/{}/traces/{}/spans/{}",
361                    self.authorizer.project_id(),
362                    hex::encode(span.span_context.trace_id().to_bytes()),
363                    hex::encode(span.span_context.span_id().to_bytes())
364                ),
365                display_name: Some(to_truncate(span.name.into_owned())),
366                span_id: hex::encode(span.span_context.span_id().to_bytes()),
367                // From the API docs: If this is a root span,
368                // then this field must be empty.
369                parent_span_id: match span.parent_span_id {
370                    SpanId::INVALID => "".to_owned(),
371                    _ => hex::encode(span.parent_span_id.to_bytes()),
372                },
373                start_time: Some(span.start_time.into()),
374                end_time: Some(span.end_time.into()),
375                attributes: Some(attributes),
376                time_events: Some(TimeEvents {
377                    time_event,
378                    ..Default::default()
379                }),
380                links: transform_links(&span.links),
381                status: status(span.status),
382                span_kind: SpanKind::from(span.span_kind) as i32,
383                ..Default::default()
384            });
385        }
386
387        let mut req = Request::new(BatchWriteSpansRequest {
388            name: format!("projects/{}", self.authorizer.project_id()),
389            spans,
390        });
391
392        self.pending_count.fetch_sub(1, Ordering::Relaxed);
393        if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
394            otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", e));
395        } else if let Err(e) = self.trace_client.batch_write_spans(req).await {
396            otel_error!(name: "ExportTransportError", error = format!("{:?}", e));
397        }
398
399        let client = match &mut self.log_client {
400            Some(client) => client,
401            None => return,
402        };
403
404        let mut req = Request::new(WriteLogEntriesRequest {
405            log_name: format!(
406                "projects/{}/logs/{}",
407                self.authorizer.project_id(),
408                client.context.log_id,
409            ),
410            entries,
411            dry_run: false,
412            labels: HashMap::default(),
413            partial_success: true,
414            resource: None,
415        });
416
417        if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
418            otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", e));
419        } else if let Err(e) = client.client.write_log_entries(req).await {
420            otel_error!(name: "ExportTransportError", error =  format!("{:?}", e));
421        }
422    }
423}
424
425#[cfg(feature = "gcp-authorizer")]
426pub struct GcpAuthorizer {
427    provider: Arc<dyn gcp_auth::TokenProvider>,
428    project_id: Arc<str>,
429}
430
431#[cfg(feature = "gcp-authorizer")]
432impl GcpAuthorizer {
433    pub async fn new() -> Result<Self, Error> {
434        let provider = gcp_auth::provider()
435            .await
436            .map_err(|e| Error::Authorizer(e.into()))?;
437
438        let project_id = provider
439            .project_id()
440            .await
441            .map_err(|e| Error::Authorizer(e.into()))?;
442
443        Ok(Self {
444            provider,
445            project_id,
446        })
447    }
448    pub fn from_gcp_auth(provider: Arc<dyn gcp_auth::TokenProvider>, project_id: Arc<str>) -> Self {
449        Self {
450            provider,
451            project_id,
452        }
453    }
454}
455
456#[cfg(feature = "gcp-authorizer")]
457impl Authorizer for GcpAuthorizer {
458    type Error = Error;
459
460    fn project_id(&self) -> &str {
461        &self.project_id
462    }
463
464    async fn authorize<T: Send + Sync>(
465        &self,
466        req: &mut Request<T>,
467        scopes: &[&str],
468    ) -> Result<(), Self::Error> {
469        let token = self
470            .provider
471            .token(scopes)
472            .await
473            .map_err(|e| Error::Authorizer(e.into()))?;
474
475        req.metadata_mut().insert(
476            "authorization",
477            MetadataValue::try_from(format!("Bearer {}", token.as_str())).unwrap(),
478        );
479
480        Ok(())
481    }
482}
483
484pub trait Authorizer: Sync + Send + 'static {
485    type Error: std::error::Error + fmt::Debug + Send + Sync;
486
487    fn project_id(&self) -> &str;
488    fn authorize<T: Send + Sync>(
489        &self,
490        request: &mut Request<T>,
491        scopes: &[&str],
492    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
493}
494
495impl From<Value> for AttributeValue {
496    fn from(v: Value) -> AttributeValue {
497        use proto::devtools::cloudtrace::v2::attribute_value;
498        let new_value = match v {
499            Value::Bool(v) => attribute_value::Value::BoolValue(v),
500            Value::F64(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
501            Value::I64(v) => attribute_value::Value::IntValue(v),
502            Value::String(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
503            Value::Array(_) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
504            _ => attribute_value::Value::StringValue(to_truncate("".to_string())),
505        };
506        AttributeValue {
507            value: Some(new_value),
508        }
509    }
510}
511
512fn to_truncate(s: String) -> TruncatableString {
513    TruncatableString {
514        value: s,
515        ..Default::default()
516    }
517}
518
519#[derive(Debug, Error)]
520pub enum Error {
521    #[error("authorizer error: {0}")]
522    Authorizer(#[source] Box<dyn std::error::Error + Send + Sync>),
523    #[error("I/O error: {0}")]
524    Io(#[from] std::io::Error),
525    #[error("{0}")]
526    Other(#[from] Box<dyn std::error::Error + Send + Sync>),
527    #[error("tonic error: {0}")]
528    Transport(#[source] Box<dyn std::error::Error + Send + Sync>),
529}
530
531impl opentelemetry_sdk::ExportError for Error {
532    fn exporter_name(&self) -> &'static str {
533        "stackdriver"
534    }
535}
536
537/// As defined in https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.type#google.logging.type.LogSeverity.
538enum LogSeverity {
539    Default = 0,
540    Debug = 100,
541    Info = 200,
542    Warning = 400,
543    Error = 500,
544}
545
546#[derive(Clone)]
547struct LogClient {
548    client: LoggingServiceV2Client<Channel>,
549    context: Arc<InternalLogContext>,
550}
551
552struct InternalLogContext {
553    log_id: String,
554    resource: proto::api::MonitoredResource,
555}
556
557#[derive(Clone)]
558pub struct LogContext {
559    pub log_id: String,
560    pub resource: MonitoredResource,
561}
562
563impl From<LogContext> for InternalLogContext {
564    fn from(cx: LogContext) -> Self {
565        let mut labels = HashMap::default();
566        let resource = match cx.resource {
567            MonitoredResource::AppEngine {
568                project_id,
569                module_id,
570                version_id,
571                zone,
572            } => {
573                labels.insert("project_id".to_string(), project_id);
574                if let Some(module_id) = module_id {
575                    labels.insert("module_id".to_string(), module_id);
576                }
577                if let Some(version_id) = version_id {
578                    labels.insert("version_id".to_string(), version_id);
579                }
580                if let Some(zone) = zone {
581                    labels.insert("zone".to_string(), zone);
582                }
583
584                proto::api::MonitoredResource {
585                    r#type: "gae_app".to_owned(),
586                    labels,
587                }
588            }
589            MonitoredResource::CloudFunction {
590                project_id,
591                function_name,
592                region,
593            } => {
594                labels.insert("project_id".to_string(), project_id);
595                if let Some(function_name) = function_name {
596                    labels.insert("function_name".to_string(), function_name);
597                }
598                if let Some(region) = region {
599                    labels.insert("region".to_string(), region);
600                }
601
602                proto::api::MonitoredResource {
603                    r#type: "cloud_function".to_owned(),
604                    labels,
605                }
606            }
607            MonitoredResource::CloudRunJob {
608                project_id,
609                job_name,
610                location,
611            } => {
612                labels.insert("project_id".to_string(), project_id);
613                if let Some(job_name) = job_name {
614                    labels.insert("job_name".to_string(), job_name);
615                }
616                if let Some(location) = location {
617                    labels.insert("location".to_string(), location);
618                }
619
620                proto::api::MonitoredResource {
621                    r#type: "cloud_run_job".to_owned(),
622                    labels,
623                }
624            }
625            MonitoredResource::CloudRunRevision {
626                project_id,
627                service_name,
628                revision_name,
629                location,
630                configuration_name,
631            } => {
632                labels.insert("project_id".to_string(), project_id);
633                if let Some(service_name) = service_name {
634                    labels.insert("service_name".to_string(), service_name);
635                }
636                if let Some(revision_name) = revision_name {
637                    labels.insert("revision_name".to_string(), revision_name);
638                }
639                if let Some(location) = location {
640                    labels.insert("location".to_string(), location);
641                }
642                if let Some(configuration_name) = configuration_name {
643                    labels.insert("configuration_name".to_string(), configuration_name);
644                }
645
646                proto::api::MonitoredResource {
647                    r#type: "cloud_run_revision".to_owned(),
648                    labels,
649                }
650            }
651
652            MonitoredResource::ComputeEngine {
653                project_id,
654                instance_id,
655                zone,
656            } => {
657                labels.insert("project_id".to_string(), project_id);
658                if let Some(instance_id) = instance_id {
659                    labels.insert("instance_id".to_string(), instance_id);
660                }
661                if let Some(zone) = zone {
662                    labels.insert("zone".to_string(), zone);
663                }
664
665                proto::api::MonitoredResource {
666                    r#type: "gce_instance".to_owned(),
667                    labels,
668                }
669            }
670
671            MonitoredResource::GenericNode {
672                project_id,
673                location,
674                namespace,
675                node_id,
676            } => {
677                labels.insert("project_id".to_string(), project_id);
678                if let Some(location) = location {
679                    labels.insert("location".to_string(), location);
680                }
681                if let Some(namespace) = namespace {
682                    labels.insert("namespace".to_string(), namespace);
683                }
684                if let Some(node_id) = node_id {
685                    labels.insert("node_id".to_string(), node_id);
686                }
687
688                proto::api::MonitoredResource {
689                    r#type: "generic_node".to_owned(),
690                    labels,
691                }
692            }
693            MonitoredResource::GenericTask {
694                project_id,
695                location,
696                namespace,
697                job,
698                task_id,
699            } => {
700                labels.insert("project_id".to_owned(), project_id);
701                if let Some(location) = location {
702                    labels.insert("location".to_owned(), location);
703                }
704                if let Some(namespace) = namespace {
705                    labels.insert("namespace".to_owned(), namespace);
706                }
707                if let Some(job) = job {
708                    labels.insert("job".to_owned(), job);
709                }
710                if let Some(task_id) = task_id {
711                    labels.insert("task_id".to_owned(), task_id);
712                }
713
714                proto::api::MonitoredResource {
715                    r#type: "generic_task".to_owned(),
716                    labels,
717                }
718            }
719            MonitoredResource::Global { project_id } => {
720                labels.insert("project_id".to_owned(), project_id);
721                proto::api::MonitoredResource {
722                    r#type: "global".to_owned(),
723                    labels,
724                }
725            }
726            MonitoredResource::KubernetesEngine {
727                project_id,
728                cluster_name,
729                location,
730                pod_name,
731                namespace_name,
732                container_name,
733            } => {
734                labels.insert("project_id".to_string(), project_id);
735                if let Some(cluster_name) = cluster_name {
736                    labels.insert("cluster_name".to_string(), cluster_name);
737                }
738                if let Some(location) = location {
739                    labels.insert("location".to_string(), location);
740                }
741                if let Some(pod_name) = pod_name {
742                    labels.insert("pod_name".to_string(), pod_name);
743                }
744                if let Some(namespace_name) = namespace_name {
745                    labels.insert("namespace_name".to_string(), namespace_name);
746                }
747                if let Some(container_name) = container_name {
748                    labels.insert("container_name".to_string(), container_name);
749                }
750
751                proto::api::MonitoredResource {
752                    r#type: "k8s_container".to_owned(),
753                    labels,
754                }
755            }
756        };
757
758        Self {
759            log_id: cx.log_id,
760            resource,
761        }
762    }
763}
764
765/// A description of a `MonitoredResource`.
766///
767/// Possible values are listed in the [API documentation](https://cloud.google.com/logging/docs/api/v2/resource-list).
768/// Please submit an issue or pull request if you want to use a resource type not listed here.
769#[derive(Clone)]
770pub enum MonitoredResource {
771    AppEngine {
772        project_id: String,
773        module_id: Option<String>,
774        version_id: Option<String>,
775        zone: Option<String>,
776    },
777    CloudFunction {
778        project_id: String,
779        function_name: Option<String>,
780        region: Option<String>,
781    },
782    CloudRunJob {
783        project_id: String,
784        job_name: Option<String>,
785        location: Option<String>,
786    },
787    CloudRunRevision {
788        project_id: String,
789        service_name: Option<String>,
790        revision_name: Option<String>,
791        location: Option<String>,
792        configuration_name: Option<String>,
793    },
794    ComputeEngine {
795        project_id: String,
796        instance_id: Option<String>,
797        zone: Option<String>,
798    },
799    KubernetesEngine {
800        project_id: String,
801        location: Option<String>,
802        cluster_name: Option<String>,
803        namespace_name: Option<String>,
804        pod_name: Option<String>,
805        container_name: Option<String>,
806    },
807    GenericNode {
808        project_id: String,
809        location: Option<String>,
810        namespace: Option<String>,
811        node_id: Option<String>,
812    },
813    GenericTask {
814        project_id: String,
815        location: Option<String>,
816        namespace: Option<String>,
817        job: Option<String>,
818        task_id: Option<String>,
819    },
820    Global {
821        project_id: String,
822    },
823}
824
825impl Attributes {
826    /// Combines `EvictedHashMap` and `Resource` attributes into a maximum of 32.
827    ///
828    /// The `Resource` takes precedence over the `EvictedHashMap` attributes.
829    fn new(attributes: Vec<KeyValue>, resource: Option<&Resource>) -> Self {
830        let mut new = Self {
831            dropped_attributes_count: 0,
832            attribute_map: HashMap::with_capacity(Ord::min(
833                MAX_ATTRIBUTES_PER_SPAN,
834                attributes.len() + resource.map_or(0, |r| r.len()),
835            )),
836        };
837
838        if let Some(resource) = resource {
839            for (k, v) in resource.iter() {
840                new.push(Cow::Borrowed(k), Cow::Borrowed(v));
841            }
842        }
843
844        for kv in attributes {
845            new.push(Cow::Owned(kv.key), Cow::Owned(kv.value));
846        }
847
848        new
849    }
850
851    fn push(&mut self, key: Cow<'_, Key>, value: Cow<'_, Value>) {
852        if self.attribute_map.len() >= MAX_ATTRIBUTES_PER_SPAN {
853            self.dropped_attributes_count += 1;
854            return;
855        }
856
857        let key_str = key.as_str();
858        if key_str.len() > 128 {
859            self.dropped_attributes_count += 1;
860            return;
861        }
862
863        for (otel_key, gcp_key) in KEY_MAP {
864            if otel_key == key_str {
865                self.attribute_map
866                    .insert(gcp_key.to_owned(), value.into_owned().into());
867                return;
868            }
869        }
870
871        self.attribute_map.insert(
872            match key {
873                Cow::Owned(k) => k.to_string(),
874                Cow::Borrowed(k) => k.to_string(),
875            },
876            value.into_owned().into(),
877        );
878    }
879}
880
881fn transform_links(links: &opentelemetry_sdk::trace::SpanLinks) -> Option<Links> {
882    if links.is_empty() {
883        return None;
884    }
885
886    Some(Links {
887        dropped_links_count: links.dropped_count as i32,
888        link: links
889            .iter()
890            .map(|link| Link {
891                trace_id: hex::encode(link.span_context.trace_id().to_bytes()),
892                span_id: hex::encode(link.span_context.span_id().to_bytes()),
893                ..Default::default()
894            })
895            .collect(),
896    })
897}
898
899// Map conventional OpenTelemetry keys to their GCP counterparts.
900//
901// https://cloud.google.com/trace/docs/trace-labels
902const KEY_MAP: [(&str, &str); 19] = [
903    (HTTP_PATH, GCP_HTTP_PATH),
904    (semconv::attribute::HTTP_HOST, "/http/host"),
905    ("http.request.header.host", "/http/host"),
906    (semconv::attribute::HTTP_METHOD, "/http/method"),
907    (semconv::attribute::HTTP_REQUEST_METHOD, "/http/method"),
908    (semconv::attribute::HTTP_TARGET, "/http/path"),
909    (semconv::attribute::URL_PATH, "/http/path"),
910    (semconv::attribute::HTTP_URL, "/http/url"),
911    (semconv::attribute::URL_FULL, "/http/url"),
912    (semconv::attribute::HTTP_USER_AGENT, "/http/user_agent"),
913    (semconv::attribute::USER_AGENT_ORIGINAL, "/http/user_agent"),
914    (semconv::attribute::HTTP_STATUS_CODE, "/http/status_code"),
915    // https://cloud.google.com/trace/docs/trace-labels#canonical-gke
916    (
917        semconv::attribute::HTTP_RESPONSE_STATUS_CODE,
918        "/http/status_code",
919    ),
920    (
921        semconv::attribute::K8S_CLUSTER_NAME,
922        "g.co/r/k8s_container/cluster_name",
923    ),
924    (
925        semconv::attribute::K8S_NAMESPACE_NAME,
926        "g.co/r/k8s_container/namespace",
927    ),
928    (
929        semconv::attribute::K8S_POD_NAME,
930        "g.co/r/k8s_container/pod_name",
931    ),
932    (
933        semconv::attribute::K8S_CONTAINER_NAME,
934        "g.co/r/k8s_container/container_name",
935    ),
936    (semconv::trace::HTTP_ROUTE, "/http/route"),
937    (HTTP_PATH, GCP_HTTP_PATH),
938];
939
940const HTTP_PATH: &str = "http.path";
941const GCP_HTTP_PATH: &str = "/http/path";
942
943impl From<opentelemetry::trace::SpanKind> for SpanKind {
944    fn from(span_kind: opentelemetry::trace::SpanKind) -> Self {
945        match span_kind {
946            opentelemetry::trace::SpanKind::Client => SpanKind::Client,
947            opentelemetry::trace::SpanKind::Server => SpanKind::Server,
948            opentelemetry::trace::SpanKind::Producer => SpanKind::Producer,
949            opentelemetry::trace::SpanKind::Consumer => SpanKind::Consumer,
950            opentelemetry::trace::SpanKind::Internal => SpanKind::Internal,
951        }
952    }
953}
954
955fn status(value: opentelemetry::trace::Status) -> Option<Status> {
956    match value {
957        opentelemetry::trace::Status::Ok => Some(Status {
958            code: Code::Ok as i32,
959            message: "".to_owned(),
960            details: vec![],
961        }),
962        opentelemetry::trace::Status::Unset => None,
963        opentelemetry::trace::Status::Error { description } => Some(Status {
964            code: Code::Unknown as i32,
965            message: description.into(),
966            details: vec![],
967        }),
968    }
969}
970const TRACE_APPEND: &str = "https://www.googleapis.com/auth/trace.append";
971const LOGGING_WRITE: &str = "https://www.googleapis.com/auth/logging.write";
972const MAX_ATTRIBUTES_PER_SPAN: usize = 32;
973
974#[cfg(test)]
975mod tests {
976    use super::*;
977    use opentelemetry::{KeyValue, Value};
978    use opentelemetry_semantic_conventions as semcov;
979
980    #[test]
981    fn test_attributes_mapping() {
982        let capacity = 10;
983        let mut attributes = Vec::with_capacity(capacity);
984
985        //	hostAttribute       = "http.host"
986        attributes.push(KeyValue::new(
987            semconv::attribute::HTTP_HOST,
988            "example.com:8080",
989        ));
990
991        // 	methodAttribute     = "http.method"
992        attributes.push(KeyValue::new(semcov::attribute::HTTP_METHOD, "POST"));
993
994        // 	pathAttribute       = "http.path"
995        attributes.push(KeyValue::new(HTTP_PATH, "/path/12314/?q=ddds#123"));
996
997        // 	urlAttribute        = "http.url"
998        attributes.push(KeyValue::new(
999            semcov::attribute::HTTP_URL,
1000            "https://example.com:8080/webshop/articles/4?s=1",
1001        ));
1002
1003        // 	userAgentAttribute  = "http.user_agent"
1004        attributes.push(KeyValue::new(
1005            semconv::attribute::HTTP_USER_AGENT,
1006            "CERN-LineMode/2.15 libwww/2.17b3",
1007        ));
1008
1009        // 	statusCodeAttribute = "http.status_code"
1010        attributes.push(KeyValue::new(semcov::attribute::HTTP_STATUS_CODE, 200i64));
1011
1012        // 	statusCodeAttribute = "http.route"
1013        attributes.push(KeyValue::new(
1014            semcov::trace::HTTP_ROUTE,
1015            "/webshop/articles/:article_id",
1016        ));
1017
1018        // 	serviceAttribute    = "service.name"
1019        let resources = Resource::builder_empty()
1020            .with_attributes([KeyValue::new(
1021                semcov::resource::SERVICE_NAME,
1022                "Test Service Name",
1023            )])
1024            .build();
1025
1026        let actual = Attributes::new(attributes, Some(&resources));
1027        assert_eq!(actual.attribute_map.len(), 8);
1028        assert_eq!(actual.dropped_attributes_count, 0);
1029        assert_eq!(
1030            actual.attribute_map.get("/http/host"),
1031            Some(&AttributeValue::from(Value::String(
1032                "example.com:8080".into()
1033            )))
1034        );
1035        assert_eq!(
1036            actual.attribute_map.get("/http/method"),
1037            Some(&AttributeValue::from(Value::String("POST".into()))),
1038        );
1039        assert_eq!(
1040            actual.attribute_map.get("/http/path"),
1041            Some(&AttributeValue::from(Value::String(
1042                "/path/12314/?q=ddds#123".into()
1043            ))),
1044        );
1045        assert_eq!(
1046            actual.attribute_map.get("/http/route"),
1047            Some(&AttributeValue::from(Value::String(
1048                "/webshop/articles/:article_id".into()
1049            ))),
1050        );
1051        assert_eq!(
1052            actual.attribute_map.get("/http/url"),
1053            Some(&AttributeValue::from(Value::String(
1054                "https://example.com:8080/webshop/articles/4?s=1".into(),
1055            ))),
1056        );
1057        assert_eq!(
1058            actual.attribute_map.get("/http/user_agent"),
1059            Some(&AttributeValue::from(Value::String(
1060                "CERN-LineMode/2.15 libwww/2.17b3".into()
1061            ))),
1062        );
1063        assert_eq!(
1064            actual.attribute_map.get("/http/status_code"),
1065            Some(&AttributeValue::from(Value::I64(200))),
1066        );
1067    }
1068
1069    #[test]
1070    fn test_too_many() {
1071        let resources = Resource::builder_empty()
1072            .with_attributes([KeyValue::new(
1073                semconv::attribute::USER_AGENT_ORIGINAL,
1074                "Test Service Name UA",
1075            )])
1076            .build();
1077        let mut attributes = Vec::with_capacity(32);
1078        for i in 0..32 {
1079            attributes.push(KeyValue::new(
1080                format!("key{}", i),
1081                Value::String(format!("value{}", i).into()),
1082            ));
1083        }
1084
1085        let actual = Attributes::new(attributes, Some(&resources));
1086        assert_eq!(actual.attribute_map.len(), 32);
1087        assert_eq!(actual.dropped_attributes_count, 1);
1088        assert_eq!(
1089            actual.attribute_map.get("/http/user_agent"),
1090            Some(&AttributeValue::from(Value::String(
1091                "Test Service Name UA".into()
1092            ))),
1093        );
1094    }
1095
1096    #[test]
1097    fn test_attributes_mapping_http_target() {
1098        let attributes = vec![KeyValue::new(
1099            semcov::attribute::HTTP_TARGET,
1100            "/path/12314/?q=ddds#123",
1101        )];
1102
1103        //	hostAttribute       = "http.target"
1104
1105        let resources = Resource::builder_empty().with_attributes([]).build();
1106        let actual = Attributes::new(attributes, Some(&resources));
1107        assert_eq!(actual.attribute_map.len(), 1);
1108        assert_eq!(actual.dropped_attributes_count, 0);
1109        assert_eq!(
1110            actual.attribute_map.get("/http/path"),
1111            Some(&AttributeValue::from(Value::String(
1112                "/path/12314/?q=ddds#123".into()
1113            ))),
1114        );
1115    }
1116
1117    #[test]
1118    fn test_attributes_mapping_dropped_attributes_count() {
1119        let attributes = vec![KeyValue::new("answer", Value::I64(42)),KeyValue::new("long_attribute_key_dvwmacxpeefbuemoxljmqvldjxmvvihoeqnuqdsyovwgljtnemouidabhkmvsnauwfnaihekcfwhugejboiyfthyhmkpsaxtidlsbwsmirebax", Value::String("Some value".into()))];
1120
1121        let resources = Resource::builder_empty().with_attributes([]).build();
1122        let actual = Attributes::new(attributes, Some(&resources));
1123        assert_eq!(
1124            actual,
1125            Attributes {
1126                attribute_map: HashMap::from([(
1127                    "answer".into(),
1128                    AttributeValue::from(Value::I64(42))
1129                ),]),
1130                dropped_attributes_count: 1,
1131            }
1132        );
1133        assert_eq!(actual.attribute_map.len(), 1);
1134        assert_eq!(actual.dropped_attributes_count, 1);
1135    }
1136}