1#![cfg(not(doctest))]
2#![allow(
7 clippy::doc_lazy_continuation,
8 deprecated,
9 rustdoc::bare_urls,
10 rustdoc::broken_intra_doc_links,
11 rustdoc::invalid_rust_codeblocks
12)]
13
14use std::{
15 borrow::Cow,
16 collections::HashMap,
17 fmt,
18 future::Future,
19 sync::{
20 atomic::{AtomicUsize, Ordering},
21 Arc, RwLock,
22 },
23 time::{Duration, Instant},
24};
25
26use futures_util::stream::StreamExt;
27use opentelemetry::{otel_error, trace::SpanId, Key, KeyValue, Value};
28use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
29use opentelemetry_sdk::{
30 trace::{SpanData, SpanExporter},
31 Resource,
32};
33use opentelemetry_semantic_conventions as semconv;
34use thiserror::Error;
35#[cfg(feature = "gcp-authorizer")]
36use tonic::metadata::MetadataValue;
37#[cfg(any(
38 feature = "tls-ring",
39 feature = "tls-native-roots",
40 feature = "tls-webpki-roots"
41))]
42use tonic::transport::ClientTlsConfig;
43use tonic::{transport::Channel, Code, Request};
44
45#[allow(clippy::derive_partial_eq_without_eq)] #[allow(clippy::doc_overindented_list_items)]
47pub mod proto;
48
49#[cfg(feature = "propagator")]
50pub mod google_trace_context_propagator;
51
52use proto::devtools::cloudtrace::v2::span::time_event::Annotation;
53use proto::devtools::cloudtrace::v2::span::{
54 Attributes, Link, Links, SpanKind, TimeEvent, TimeEvents,
55};
56use proto::devtools::cloudtrace::v2::trace_service_client::TraceServiceClient;
57use proto::devtools::cloudtrace::v2::{
58 AttributeValue, BatchWriteSpansRequest, Span, TruncatableString,
59};
60use proto::logging::v2::{
61 log_entry::Payload, logging_service_v2_client::LoggingServiceV2Client, LogEntry,
62 LogEntrySourceLocation, WriteLogEntriesRequest,
63};
64use proto::rpc::Status;
65
66#[derive(Clone)]
71pub struct StackDriverExporter {
72 tx: futures_channel::mpsc::Sender<Vec<SpanData>>,
73 pending_count: Arc<AtomicUsize>,
74 maximum_shutdown_duration: Duration,
75 resource: Arc<RwLock<Option<Resource>>>,
76}
77
78impl StackDriverExporter {
79 pub fn builder() -> Builder {
80 Builder::default()
81 }
82
83 pub fn pending_count(&self) -> usize {
84 self.pending_count.load(Ordering::Relaxed)
85 }
86}
87
88impl SpanExporter for StackDriverExporter {
89 async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
90 match self.tx.clone().try_send(batch) {
91 Err(e) => Err(OTelSdkError::InternalFailure(format!("{:?}", e))),
92 Ok(()) => {
93 self.pending_count.fetch_add(1, Ordering::Relaxed);
94 Ok(())
95 }
96 }
97 }
98
99 fn shutdown(&mut self) -> OTelSdkResult {
100 let start = Instant::now();
101 while (Instant::now() - start) < self.maximum_shutdown_duration && self.pending_count() > 0
102 {
103 std::thread::yield_now();
104 }
106 Ok(())
107 }
108
109 fn set_resource(&mut self, resource: &Resource) {
110 match self.resource.write() {
111 Ok(mut guard) => *guard = Some(resource.clone()),
112 Err(poisoned) => *poisoned.into_inner() = Some(resource.clone()),
113 }
114 }
115}
116
117impl fmt::Debug for StackDriverExporter {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 #[allow(clippy::unneeded_field_pattern)]
120 let Self {
121 tx: _,
122 pending_count,
123 maximum_shutdown_duration,
124 resource: _,
125 } = self;
126 f.debug_struct("StackDriverExporter")
127 .field("tx", &"(elided)")
128 .field("pending_count", pending_count)
129 .field("maximum_shutdown_duration", maximum_shutdown_duration)
130 .finish()
131 }
132}
133
134#[derive(Clone, Default)]
136pub struct Builder {
137 maximum_shutdown_duration: Option<Duration>,
138 num_concurrent_requests: Option<usize>,
139 log_context: Option<LogContext>,
140}
141
142impl Builder {
143 pub fn maximum_shutdown_duration(mut self, duration: Duration) -> Self {
147 self.maximum_shutdown_duration = Some(duration);
148 self
149 }
150
151 pub fn num_concurrent_requests(mut self, num_concurrent_requests: usize) -> Self {
155 self.num_concurrent_requests = Some(num_concurrent_requests);
156 self
157 }
158
159 pub fn log_context(mut self, log_context: LogContext) -> Self {
161 self.log_context = Some(log_context);
162 self
163 }
164
165 pub async fn build<A: Authorizer>(
166 self,
167 authenticator: A,
168 ) -> Result<(StackDriverExporter, impl Future<Output = ()>), Error>
169 where
170 Error: From<A::Error>,
171 {
172 let Self {
173 maximum_shutdown_duration,
174 num_concurrent_requests,
175 log_context,
176 } = self;
177 let uri = http::uri::Uri::from_static("https://cloudtrace.googleapis.com:443");
178
179 #[cfg(any(
180 feature = "tls-ring",
181 feature = "tls-native-roots",
182 feature = "tls-webpki-roots"
183 ))]
184 let tls_config = ClientTlsConfig::new().with_enabled_roots();
185
186 let trace_channel_builder = Channel::builder(uri);
187 #[cfg(any(
188 feature = "tls-ring",
189 feature = "tls-native-roots",
190 feature = "tls-webpki-roots"
191 ))]
192 let trace_channel_builder = trace_channel_builder
193 .tls_config(tls_config.clone())
194 .map_err(|e| Error::Transport(e.into()))?;
195
196 let trace_channel = trace_channel_builder
197 .connect()
198 .await
199 .map_err(|e| Error::Transport(e.into()))?;
200
201 let log_client = match log_context {
202 Some(log_context) => {
203 let log_channel_builder = Channel::builder(http::uri::Uri::from_static(
204 "https://logging.googleapis.com:443",
205 ));
206 #[cfg(any(
207 feature = "tls-ring",
208 feature = "tls-native-roots",
209 feature = "tls-webpki-roots"
210 ))]
211 let log_channel_builder = log_channel_builder
212 .tls_config(tls_config)
213 .map_err(|e| Error::Transport(e.into()))?;
214
215 let log_channel = log_channel_builder
216 .connect()
217 .await
218 .map_err(|e| Error::Transport(e.into()))?;
219
220 Some(LogClient {
221 client: LoggingServiceV2Client::new(log_channel),
222 context: Arc::new(InternalLogContext::from(log_context)),
223 })
224 }
225 None => None,
226 };
227
228 let (tx, rx) = futures_channel::mpsc::channel(64);
229 let pending_count = Arc::new(AtomicUsize::new(0));
230 let scopes = Arc::new(match log_client {
231 Some(_) => vec![TRACE_APPEND, LOGGING_WRITE],
232 None => vec![TRACE_APPEND],
233 });
234
235 let count_clone = pending_count.clone();
236 let resource = Arc::new(RwLock::new(None));
237 let ctx_resource = resource.clone();
238 let future = async move {
239 let trace_client = TraceServiceClient::new(trace_channel);
240 let authorizer = &authenticator;
241 let log_client = log_client.clone();
242 rx.for_each_concurrent(num_concurrent_requests, move |batch| {
243 let trace_client = trace_client.clone();
244 let log_client = log_client.clone();
245 let pending_count = count_clone.clone();
246 let scopes = scopes.clone();
247 let resource = ctx_resource.clone();
248 ExporterContext {
249 trace_client,
250 log_client,
251 authorizer,
252 pending_count,
253 scopes,
254 resource,
255 }
256 .export(batch)
257 })
258 .await
259 };
260
261 let exporter = StackDriverExporter {
262 tx,
263 pending_count,
264 maximum_shutdown_duration: maximum_shutdown_duration
265 .unwrap_or_else(|| Duration::from_secs(5)),
266 resource,
267 };
268
269 Ok((exporter, future))
270 }
271}
272
273struct ExporterContext<'a, A> {
274 trace_client: TraceServiceClient<Channel>,
275 log_client: Option<LogClient>,
276 authorizer: &'a A,
277 pending_count: Arc<AtomicUsize>,
278 scopes: Arc<Vec<&'static str>>,
279 resource: Arc<RwLock<Option<Resource>>>,
280}
281
282impl<A: Authorizer> ExporterContext<'_, A>
283where
284 Error: From<A::Error>,
285{
286 async fn export(mut self, batch: Vec<SpanData>) {
287 use proto::devtools::cloudtrace::v2::span::time_event::Value;
288
289 let mut entries = Vec::new();
290 let mut spans = Vec::with_capacity(batch.len());
291 for span in batch {
292 let trace_id = hex::encode(span.span_context.trace_id().to_bytes());
293 let span_id = hex::encode(span.span_context.span_id().to_bytes());
294 let time_event = match &self.log_client {
295 None => span
296 .events
297 .into_iter()
298 .map(|event| TimeEvent {
299 time: Some(event.timestamp.into()),
300 value: Some(Value::Annotation(Annotation {
301 description: Some(to_truncate(event.name.into_owned())),
302 ..Default::default()
303 })),
304 })
305 .collect(),
306 Some(client) => {
307 entries.extend(span.events.into_iter().map(|event| {
308 let (mut level, mut target, mut labels) =
309 (LogSeverity::Default, None, HashMap::default());
310 for kv in event.attributes {
311 match kv.key.as_str() {
312 "level" => {
313 level = match kv.value.as_str().as_ref() {
314 "DEBUG" | "TRACE" => LogSeverity::Debug,
315 "INFO" => LogSeverity::Info,
316 "WARN" => LogSeverity::Warning,
317 "ERROR" => LogSeverity::Error,
318 _ => LogSeverity::Default, }
320 }
321 "target" => target = Some(kv.value.as_str().into_owned()),
322 key => {
323 labels.insert(key.to_owned(), kv.value.as_str().into_owned());
324 }
325 }
326 }
327 let project_id = self.authorizer.project_id();
328 let log_id = &client.context.log_id;
329 LogEntry {
330 log_name: format!("projects/{project_id}/logs/{log_id}"),
331 resource: Some(client.context.resource.clone()),
332 severity: level as i32,
333 timestamp: Some(event.timestamp.into()),
334 labels,
335 trace: format!("projects/{project_id}/traces/{trace_id}"),
336 span_id: span_id.clone(),
337 source_location: target.map(|target| LogEntrySourceLocation {
338 file: String::new(),
339 line: 0,
340 function: target,
341 }),
342 payload: Some(Payload::TextPayload(event.name.into_owned())),
343 ..Default::default()
345 }
346 }));
347
348 vec![]
349 }
350 };
351
352 let resource = self.resource.read().ok();
353 let attributes = match resource {
354 Some(resource) => Attributes::new(span.attributes, resource.as_ref()),
355 None => Attributes::new(span.attributes, None),
356 };
357
358 spans.push(Span {
359 name: format!(
360 "projects/{}/traces/{}/spans/{}",
361 self.authorizer.project_id(),
362 hex::encode(span.span_context.trace_id().to_bytes()),
363 hex::encode(span.span_context.span_id().to_bytes())
364 ),
365 display_name: Some(to_truncate(span.name.into_owned())),
366 span_id: hex::encode(span.span_context.span_id().to_bytes()),
367 parent_span_id: match span.parent_span_id {
370 SpanId::INVALID => "".to_owned(),
371 _ => hex::encode(span.parent_span_id.to_bytes()),
372 },
373 start_time: Some(span.start_time.into()),
374 end_time: Some(span.end_time.into()),
375 attributes: Some(attributes),
376 time_events: Some(TimeEvents {
377 time_event,
378 ..Default::default()
379 }),
380 links: transform_links(&span.links),
381 status: status(span.status),
382 span_kind: SpanKind::from(span.span_kind) as i32,
383 ..Default::default()
384 });
385 }
386
387 let mut req = Request::new(BatchWriteSpansRequest {
388 name: format!("projects/{}", self.authorizer.project_id()),
389 spans,
390 });
391
392 self.pending_count.fetch_sub(1, Ordering::Relaxed);
393 if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
394 otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", e));
395 } else if let Err(e) = self.trace_client.batch_write_spans(req).await {
396 otel_error!(name: "ExportTransportError", error = format!("{:?}", e));
397 }
398
399 let client = match &mut self.log_client {
400 Some(client) => client,
401 None => return,
402 };
403
404 let mut req = Request::new(WriteLogEntriesRequest {
405 log_name: format!(
406 "projects/{}/logs/{}",
407 self.authorizer.project_id(),
408 client.context.log_id,
409 ),
410 entries,
411 dry_run: false,
412 labels: HashMap::default(),
413 partial_success: true,
414 resource: None,
415 });
416
417 if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
418 otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", e));
419 } else if let Err(e) = client.client.write_log_entries(req).await {
420 otel_error!(name: "ExportTransportError", error = format!("{:?}", e));
421 }
422 }
423}
424
425#[cfg(feature = "gcp-authorizer")]
426pub struct GcpAuthorizer {
427 provider: Arc<dyn gcp_auth::TokenProvider>,
428 project_id: Arc<str>,
429}
430
431#[cfg(feature = "gcp-authorizer")]
432impl GcpAuthorizer {
433 pub async fn new() -> Result<Self, Error> {
434 let provider = gcp_auth::provider()
435 .await
436 .map_err(|e| Error::Authorizer(e.into()))?;
437
438 let project_id = provider
439 .project_id()
440 .await
441 .map_err(|e| Error::Authorizer(e.into()))?;
442
443 Ok(Self {
444 provider,
445 project_id,
446 })
447 }
448 pub fn from_gcp_auth(provider: Arc<dyn gcp_auth::TokenProvider>, project_id: Arc<str>) -> Self {
449 Self {
450 provider,
451 project_id,
452 }
453 }
454}
455
456#[cfg(feature = "gcp-authorizer")]
457impl Authorizer for GcpAuthorizer {
458 type Error = Error;
459
460 fn project_id(&self) -> &str {
461 &self.project_id
462 }
463
464 async fn authorize<T: Send + Sync>(
465 &self,
466 req: &mut Request<T>,
467 scopes: &[&str],
468 ) -> Result<(), Self::Error> {
469 let token = self
470 .provider
471 .token(scopes)
472 .await
473 .map_err(|e| Error::Authorizer(e.into()))?;
474
475 req.metadata_mut().insert(
476 "authorization",
477 MetadataValue::try_from(format!("Bearer {}", token.as_str())).unwrap(),
478 );
479
480 Ok(())
481 }
482}
483
484pub trait Authorizer: Sync + Send + 'static {
485 type Error: std::error::Error + fmt::Debug + Send + Sync;
486
487 fn project_id(&self) -> &str;
488 fn authorize<T: Send + Sync>(
489 &self,
490 request: &mut Request<T>,
491 scopes: &[&str],
492 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
493}
494
495impl From<Value> for AttributeValue {
496 fn from(v: Value) -> AttributeValue {
497 use proto::devtools::cloudtrace::v2::attribute_value;
498 let new_value = match v {
499 Value::Bool(v) => attribute_value::Value::BoolValue(v),
500 Value::F64(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
501 Value::I64(v) => attribute_value::Value::IntValue(v),
502 Value::String(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
503 Value::Array(_) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
504 _ => attribute_value::Value::StringValue(to_truncate("".to_string())),
505 };
506 AttributeValue {
507 value: Some(new_value),
508 }
509 }
510}
511
512fn to_truncate(s: String) -> TruncatableString {
513 TruncatableString {
514 value: s,
515 ..Default::default()
516 }
517}
518
519#[derive(Debug, Error)]
520pub enum Error {
521 #[error("authorizer error: {0}")]
522 Authorizer(#[source] Box<dyn std::error::Error + Send + Sync>),
523 #[error("I/O error: {0}")]
524 Io(#[from] std::io::Error),
525 #[error("{0}")]
526 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
527 #[error("tonic error: {0}")]
528 Transport(#[source] Box<dyn std::error::Error + Send + Sync>),
529}
530
531impl opentelemetry_sdk::ExportError for Error {
532 fn exporter_name(&self) -> &'static str {
533 "stackdriver"
534 }
535}
536
537enum LogSeverity {
539 Default = 0,
540 Debug = 100,
541 Info = 200,
542 Warning = 400,
543 Error = 500,
544}
545
546#[derive(Clone)]
547struct LogClient {
548 client: LoggingServiceV2Client<Channel>,
549 context: Arc<InternalLogContext>,
550}
551
552struct InternalLogContext {
553 log_id: String,
554 resource: proto::api::MonitoredResource,
555}
556
557#[derive(Clone)]
558pub struct LogContext {
559 pub log_id: String,
560 pub resource: MonitoredResource,
561}
562
563impl From<LogContext> for InternalLogContext {
564 fn from(cx: LogContext) -> Self {
565 let mut labels = HashMap::default();
566 let resource = match cx.resource {
567 MonitoredResource::AppEngine {
568 project_id,
569 module_id,
570 version_id,
571 zone,
572 } => {
573 labels.insert("project_id".to_string(), project_id);
574 if let Some(module_id) = module_id {
575 labels.insert("module_id".to_string(), module_id);
576 }
577 if let Some(version_id) = version_id {
578 labels.insert("version_id".to_string(), version_id);
579 }
580 if let Some(zone) = zone {
581 labels.insert("zone".to_string(), zone);
582 }
583
584 proto::api::MonitoredResource {
585 r#type: "gae_app".to_owned(),
586 labels,
587 }
588 }
589 MonitoredResource::CloudFunction {
590 project_id,
591 function_name,
592 region,
593 } => {
594 labels.insert("project_id".to_string(), project_id);
595 if let Some(function_name) = function_name {
596 labels.insert("function_name".to_string(), function_name);
597 }
598 if let Some(region) = region {
599 labels.insert("region".to_string(), region);
600 }
601
602 proto::api::MonitoredResource {
603 r#type: "cloud_function".to_owned(),
604 labels,
605 }
606 }
607 MonitoredResource::CloudRunJob {
608 project_id,
609 job_name,
610 location,
611 } => {
612 labels.insert("project_id".to_string(), project_id);
613 if let Some(job_name) = job_name {
614 labels.insert("job_name".to_string(), job_name);
615 }
616 if let Some(location) = location {
617 labels.insert("location".to_string(), location);
618 }
619
620 proto::api::MonitoredResource {
621 r#type: "cloud_run_job".to_owned(),
622 labels,
623 }
624 }
625 MonitoredResource::CloudRunRevision {
626 project_id,
627 service_name,
628 revision_name,
629 location,
630 configuration_name,
631 } => {
632 labels.insert("project_id".to_string(), project_id);
633 if let Some(service_name) = service_name {
634 labels.insert("service_name".to_string(), service_name);
635 }
636 if let Some(revision_name) = revision_name {
637 labels.insert("revision_name".to_string(), revision_name);
638 }
639 if let Some(location) = location {
640 labels.insert("location".to_string(), location);
641 }
642 if let Some(configuration_name) = configuration_name {
643 labels.insert("configuration_name".to_string(), configuration_name);
644 }
645
646 proto::api::MonitoredResource {
647 r#type: "cloud_run_revision".to_owned(),
648 labels,
649 }
650 }
651
652 MonitoredResource::ComputeEngine {
653 project_id,
654 instance_id,
655 zone,
656 } => {
657 labels.insert("project_id".to_string(), project_id);
658 if let Some(instance_id) = instance_id {
659 labels.insert("instance_id".to_string(), instance_id);
660 }
661 if let Some(zone) = zone {
662 labels.insert("zone".to_string(), zone);
663 }
664
665 proto::api::MonitoredResource {
666 r#type: "gce_instance".to_owned(),
667 labels,
668 }
669 }
670
671 MonitoredResource::GenericNode {
672 project_id,
673 location,
674 namespace,
675 node_id,
676 } => {
677 labels.insert("project_id".to_string(), project_id);
678 if let Some(location) = location {
679 labels.insert("location".to_string(), location);
680 }
681 if let Some(namespace) = namespace {
682 labels.insert("namespace".to_string(), namespace);
683 }
684 if let Some(node_id) = node_id {
685 labels.insert("node_id".to_string(), node_id);
686 }
687
688 proto::api::MonitoredResource {
689 r#type: "generic_node".to_owned(),
690 labels,
691 }
692 }
693 MonitoredResource::GenericTask {
694 project_id,
695 location,
696 namespace,
697 job,
698 task_id,
699 } => {
700 labels.insert("project_id".to_owned(), project_id);
701 if let Some(location) = location {
702 labels.insert("location".to_owned(), location);
703 }
704 if let Some(namespace) = namespace {
705 labels.insert("namespace".to_owned(), namespace);
706 }
707 if let Some(job) = job {
708 labels.insert("job".to_owned(), job);
709 }
710 if let Some(task_id) = task_id {
711 labels.insert("task_id".to_owned(), task_id);
712 }
713
714 proto::api::MonitoredResource {
715 r#type: "generic_task".to_owned(),
716 labels,
717 }
718 }
719 MonitoredResource::Global { project_id } => {
720 labels.insert("project_id".to_owned(), project_id);
721 proto::api::MonitoredResource {
722 r#type: "global".to_owned(),
723 labels,
724 }
725 }
726 MonitoredResource::KubernetesEngine {
727 project_id,
728 cluster_name,
729 location,
730 pod_name,
731 namespace_name,
732 container_name,
733 } => {
734 labels.insert("project_id".to_string(), project_id);
735 if let Some(cluster_name) = cluster_name {
736 labels.insert("cluster_name".to_string(), cluster_name);
737 }
738 if let Some(location) = location {
739 labels.insert("location".to_string(), location);
740 }
741 if let Some(pod_name) = pod_name {
742 labels.insert("pod_name".to_string(), pod_name);
743 }
744 if let Some(namespace_name) = namespace_name {
745 labels.insert("namespace_name".to_string(), namespace_name);
746 }
747 if let Some(container_name) = container_name {
748 labels.insert("container_name".to_string(), container_name);
749 }
750
751 proto::api::MonitoredResource {
752 r#type: "k8s_container".to_owned(),
753 labels,
754 }
755 }
756 };
757
758 Self {
759 log_id: cx.log_id,
760 resource,
761 }
762 }
763}
764
765#[derive(Clone)]
770pub enum MonitoredResource {
771 AppEngine {
772 project_id: String,
773 module_id: Option<String>,
774 version_id: Option<String>,
775 zone: Option<String>,
776 },
777 CloudFunction {
778 project_id: String,
779 function_name: Option<String>,
780 region: Option<String>,
781 },
782 CloudRunJob {
783 project_id: String,
784 job_name: Option<String>,
785 location: Option<String>,
786 },
787 CloudRunRevision {
788 project_id: String,
789 service_name: Option<String>,
790 revision_name: Option<String>,
791 location: Option<String>,
792 configuration_name: Option<String>,
793 },
794 ComputeEngine {
795 project_id: String,
796 instance_id: Option<String>,
797 zone: Option<String>,
798 },
799 KubernetesEngine {
800 project_id: String,
801 location: Option<String>,
802 cluster_name: Option<String>,
803 namespace_name: Option<String>,
804 pod_name: Option<String>,
805 container_name: Option<String>,
806 },
807 GenericNode {
808 project_id: String,
809 location: Option<String>,
810 namespace: Option<String>,
811 node_id: Option<String>,
812 },
813 GenericTask {
814 project_id: String,
815 location: Option<String>,
816 namespace: Option<String>,
817 job: Option<String>,
818 task_id: Option<String>,
819 },
820 Global {
821 project_id: String,
822 },
823}
824
825impl Attributes {
826 fn new(attributes: Vec<KeyValue>, resource: Option<&Resource>) -> Self {
830 let mut new = Self {
831 dropped_attributes_count: 0,
832 attribute_map: HashMap::with_capacity(Ord::min(
833 MAX_ATTRIBUTES_PER_SPAN,
834 attributes.len() + resource.map_or(0, |r| r.len()),
835 )),
836 };
837
838 if let Some(resource) = resource {
839 for (k, v) in resource.iter() {
840 new.push(Cow::Borrowed(k), Cow::Borrowed(v));
841 }
842 }
843
844 for kv in attributes {
845 new.push(Cow::Owned(kv.key), Cow::Owned(kv.value));
846 }
847
848 new
849 }
850
851 fn push(&mut self, key: Cow<'_, Key>, value: Cow<'_, Value>) {
852 if self.attribute_map.len() >= MAX_ATTRIBUTES_PER_SPAN {
853 self.dropped_attributes_count += 1;
854 return;
855 }
856
857 let key_str = key.as_str();
858 if key_str.len() > 128 {
859 self.dropped_attributes_count += 1;
860 return;
861 }
862
863 for (otel_key, gcp_key) in KEY_MAP {
864 if otel_key == key_str {
865 self.attribute_map
866 .insert(gcp_key.to_owned(), value.into_owned().into());
867 return;
868 }
869 }
870
871 self.attribute_map.insert(
872 match key {
873 Cow::Owned(k) => k.to_string(),
874 Cow::Borrowed(k) => k.to_string(),
875 },
876 value.into_owned().into(),
877 );
878 }
879}
880
881fn transform_links(links: &opentelemetry_sdk::trace::SpanLinks) -> Option<Links> {
882 if links.is_empty() {
883 return None;
884 }
885
886 Some(Links {
887 dropped_links_count: links.dropped_count as i32,
888 link: links
889 .iter()
890 .map(|link| Link {
891 trace_id: hex::encode(link.span_context.trace_id().to_bytes()),
892 span_id: hex::encode(link.span_context.span_id().to_bytes()),
893 ..Default::default()
894 })
895 .collect(),
896 })
897}
898
899const KEY_MAP: [(&str, &str); 19] = [
903 (HTTP_PATH, GCP_HTTP_PATH),
904 (semconv::attribute::HTTP_HOST, "/http/host"),
905 ("http.request.header.host", "/http/host"),
906 (semconv::attribute::HTTP_METHOD, "/http/method"),
907 (semconv::attribute::HTTP_REQUEST_METHOD, "/http/method"),
908 (semconv::attribute::HTTP_TARGET, "/http/path"),
909 (semconv::attribute::URL_PATH, "/http/path"),
910 (semconv::attribute::HTTP_URL, "/http/url"),
911 (semconv::attribute::URL_FULL, "/http/url"),
912 (semconv::attribute::HTTP_USER_AGENT, "/http/user_agent"),
913 (semconv::attribute::USER_AGENT_ORIGINAL, "/http/user_agent"),
914 (semconv::attribute::HTTP_STATUS_CODE, "/http/status_code"),
915 (
917 semconv::attribute::HTTP_RESPONSE_STATUS_CODE,
918 "/http/status_code",
919 ),
920 (
921 semconv::attribute::K8S_CLUSTER_NAME,
922 "g.co/r/k8s_container/cluster_name",
923 ),
924 (
925 semconv::attribute::K8S_NAMESPACE_NAME,
926 "g.co/r/k8s_container/namespace",
927 ),
928 (
929 semconv::attribute::K8S_POD_NAME,
930 "g.co/r/k8s_container/pod_name",
931 ),
932 (
933 semconv::attribute::K8S_CONTAINER_NAME,
934 "g.co/r/k8s_container/container_name",
935 ),
936 (semconv::trace::HTTP_ROUTE, "/http/route"),
937 (HTTP_PATH, GCP_HTTP_PATH),
938];
939
940const HTTP_PATH: &str = "http.path";
941const GCP_HTTP_PATH: &str = "/http/path";
942
943impl From<opentelemetry::trace::SpanKind> for SpanKind {
944 fn from(span_kind: opentelemetry::trace::SpanKind) -> Self {
945 match span_kind {
946 opentelemetry::trace::SpanKind::Client => SpanKind::Client,
947 opentelemetry::trace::SpanKind::Server => SpanKind::Server,
948 opentelemetry::trace::SpanKind::Producer => SpanKind::Producer,
949 opentelemetry::trace::SpanKind::Consumer => SpanKind::Consumer,
950 opentelemetry::trace::SpanKind::Internal => SpanKind::Internal,
951 }
952 }
953}
954
955fn status(value: opentelemetry::trace::Status) -> Option<Status> {
956 match value {
957 opentelemetry::trace::Status::Ok => Some(Status {
958 code: Code::Ok as i32,
959 message: "".to_owned(),
960 details: vec![],
961 }),
962 opentelemetry::trace::Status::Unset => None,
963 opentelemetry::trace::Status::Error { description } => Some(Status {
964 code: Code::Unknown as i32,
965 message: description.into(),
966 details: vec![],
967 }),
968 }
969}
970const TRACE_APPEND: &str = "https://www.googleapis.com/auth/trace.append";
971const LOGGING_WRITE: &str = "https://www.googleapis.com/auth/logging.write";
972const MAX_ATTRIBUTES_PER_SPAN: usize = 32;
973
974#[cfg(test)]
975mod tests {
976 use super::*;
977 use opentelemetry::{KeyValue, Value};
978 use opentelemetry_semantic_conventions as semcov;
979
980 #[test]
981 fn test_attributes_mapping() {
982 let capacity = 10;
983 let mut attributes = Vec::with_capacity(capacity);
984
985 attributes.push(KeyValue::new(
987 semconv::attribute::HTTP_HOST,
988 "example.com:8080",
989 ));
990
991 attributes.push(KeyValue::new(semcov::attribute::HTTP_METHOD, "POST"));
993
994 attributes.push(KeyValue::new(HTTP_PATH, "/path/12314/?q=ddds#123"));
996
997 attributes.push(KeyValue::new(
999 semcov::attribute::HTTP_URL,
1000 "https://example.com:8080/webshop/articles/4?s=1",
1001 ));
1002
1003 attributes.push(KeyValue::new(
1005 semconv::attribute::HTTP_USER_AGENT,
1006 "CERN-LineMode/2.15 libwww/2.17b3",
1007 ));
1008
1009 attributes.push(KeyValue::new(semcov::attribute::HTTP_STATUS_CODE, 200i64));
1011
1012 attributes.push(KeyValue::new(
1014 semcov::trace::HTTP_ROUTE,
1015 "/webshop/articles/:article_id",
1016 ));
1017
1018 let resources = Resource::builder_empty()
1020 .with_attributes([KeyValue::new(
1021 semcov::resource::SERVICE_NAME,
1022 "Test Service Name",
1023 )])
1024 .build();
1025
1026 let actual = Attributes::new(attributes, Some(&resources));
1027 assert_eq!(actual.attribute_map.len(), 8);
1028 assert_eq!(actual.dropped_attributes_count, 0);
1029 assert_eq!(
1030 actual.attribute_map.get("/http/host"),
1031 Some(&AttributeValue::from(Value::String(
1032 "example.com:8080".into()
1033 )))
1034 );
1035 assert_eq!(
1036 actual.attribute_map.get("/http/method"),
1037 Some(&AttributeValue::from(Value::String("POST".into()))),
1038 );
1039 assert_eq!(
1040 actual.attribute_map.get("/http/path"),
1041 Some(&AttributeValue::from(Value::String(
1042 "/path/12314/?q=ddds#123".into()
1043 ))),
1044 );
1045 assert_eq!(
1046 actual.attribute_map.get("/http/route"),
1047 Some(&AttributeValue::from(Value::String(
1048 "/webshop/articles/:article_id".into()
1049 ))),
1050 );
1051 assert_eq!(
1052 actual.attribute_map.get("/http/url"),
1053 Some(&AttributeValue::from(Value::String(
1054 "https://example.com:8080/webshop/articles/4?s=1".into(),
1055 ))),
1056 );
1057 assert_eq!(
1058 actual.attribute_map.get("/http/user_agent"),
1059 Some(&AttributeValue::from(Value::String(
1060 "CERN-LineMode/2.15 libwww/2.17b3".into()
1061 ))),
1062 );
1063 assert_eq!(
1064 actual.attribute_map.get("/http/status_code"),
1065 Some(&AttributeValue::from(Value::I64(200))),
1066 );
1067 }
1068
1069 #[test]
1070 fn test_too_many() {
1071 let resources = Resource::builder_empty()
1072 .with_attributes([KeyValue::new(
1073 semconv::attribute::USER_AGENT_ORIGINAL,
1074 "Test Service Name UA",
1075 )])
1076 .build();
1077 let mut attributes = Vec::with_capacity(32);
1078 for i in 0..32 {
1079 attributes.push(KeyValue::new(
1080 format!("key{}", i),
1081 Value::String(format!("value{}", i).into()),
1082 ));
1083 }
1084
1085 let actual = Attributes::new(attributes, Some(&resources));
1086 assert_eq!(actual.attribute_map.len(), 32);
1087 assert_eq!(actual.dropped_attributes_count, 1);
1088 assert_eq!(
1089 actual.attribute_map.get("/http/user_agent"),
1090 Some(&AttributeValue::from(Value::String(
1091 "Test Service Name UA".into()
1092 ))),
1093 );
1094 }
1095
1096 #[test]
1097 fn test_attributes_mapping_http_target() {
1098 let attributes = vec![KeyValue::new(
1099 semcov::attribute::HTTP_TARGET,
1100 "/path/12314/?q=ddds#123",
1101 )];
1102
1103 let resources = Resource::builder_empty().with_attributes([]).build();
1106 let actual = Attributes::new(attributes, Some(&resources));
1107 assert_eq!(actual.attribute_map.len(), 1);
1108 assert_eq!(actual.dropped_attributes_count, 0);
1109 assert_eq!(
1110 actual.attribute_map.get("/http/path"),
1111 Some(&AttributeValue::from(Value::String(
1112 "/path/12314/?q=ddds#123".into()
1113 ))),
1114 );
1115 }
1116
1117 #[test]
1118 fn test_attributes_mapping_dropped_attributes_count() {
1119 let attributes = vec![KeyValue::new("answer", Value::I64(42)),KeyValue::new("long_attribute_key_dvwmacxpeefbuemoxljmqvldjxmvvihoeqnuqdsyovwgljtnemouidabhkmvsnauwfnaihekcfwhugejboiyfthyhmkpsaxtidlsbwsmirebax", Value::String("Some value".into()))];
1120
1121 let resources = Resource::builder_empty().with_attributes([]).build();
1122 let actual = Attributes::new(attributes, Some(&resources));
1123 assert_eq!(
1124 actual,
1125 Attributes {
1126 attribute_map: HashMap::from([(
1127 "answer".into(),
1128 AttributeValue::from(Value::I64(42))
1129 ),]),
1130 dropped_attributes_count: 1,
1131 }
1132 );
1133 assert_eq!(actual.attribute_map.len(), 1);
1134 assert_eq!(actual.dropped_attributes_count, 1);
1135 }
1136}