1#![cfg(not(doctest))]
2#![allow(
7 clippy::doc_lazy_continuation,
8 deprecated,
9 rustdoc::bare_urls,
10 rustdoc::broken_intra_doc_links,
11 rustdoc::invalid_rust_codeblocks
12)]
13
14use std::{
15 borrow::Cow,
16 collections::HashMap,
17 fmt,
18 future::Future,
19 sync::{
20 atomic::{AtomicUsize, Ordering},
21 Arc, RwLock,
22 },
23 time::{Duration, Instant},
24};
25
26use futures_util::stream::StreamExt;
27use opentelemetry::{otel_error, trace::SpanId, Key, KeyValue, Value};
28use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
29use opentelemetry_sdk::{
30 trace::{SpanData, SpanExporter},
31 Resource,
32};
33use opentelemetry_semantic_conventions as semconv;
34use thiserror::Error;
35#[cfg(feature = "gcp-authorizer")]
36use tonic::metadata::MetadataValue;
37use tonic::{
38 transport::{Channel, ClientTlsConfig},
39 Code, Request,
40};
41
42#[allow(clippy::derive_partial_eq_without_eq)] pub mod proto;
44
45#[cfg(feature = "propagator")]
46pub mod google_trace_context_propagator;
47
48use proto::devtools::cloudtrace::v2::span::time_event::Annotation;
49use proto::devtools::cloudtrace::v2::span::{
50 Attributes, Link, Links, SpanKind, TimeEvent, TimeEvents,
51};
52use proto::devtools::cloudtrace::v2::trace_service_client::TraceServiceClient;
53use proto::devtools::cloudtrace::v2::{
54 AttributeValue, BatchWriteSpansRequest, Span, TruncatableString,
55};
56use proto::logging::v2::{
57 log_entry::Payload, logging_service_v2_client::LoggingServiceV2Client, LogEntry,
58 LogEntrySourceLocation, WriteLogEntriesRequest,
59};
60use proto::rpc::Status;
61
62#[derive(Clone)]
67pub struct StackDriverExporter {
68 tx: futures_channel::mpsc::Sender<Vec<SpanData>>,
69 pending_count: Arc<AtomicUsize>,
70 maximum_shutdown_duration: Duration,
71 resource: Arc<RwLock<Option<Resource>>>,
72}
73
74impl StackDriverExporter {
75 pub fn builder() -> Builder {
76 Builder::default()
77 }
78
79 pub fn pending_count(&self) -> usize {
80 self.pending_count.load(Ordering::Relaxed)
81 }
82}
83
84impl SpanExporter for StackDriverExporter {
85 async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
86 match self.tx.clone().try_send(batch) {
87 Err(e) => Err(OTelSdkError::InternalFailure(format!("{:?}", e))),
88 Ok(()) => {
89 self.pending_count.fetch_add(1, Ordering::Relaxed);
90 Ok(())
91 }
92 }
93 }
94
95 fn shutdown(&mut self) -> OTelSdkResult {
96 let start = Instant::now();
97 while (Instant::now() - start) < self.maximum_shutdown_duration && self.pending_count() > 0
98 {
99 std::thread::yield_now();
100 }
102 Ok(())
103 }
104
105 fn set_resource(&mut self, resource: &Resource) {
106 match self.resource.write() {
107 Ok(mut guard) => *guard = Some(resource.clone()),
108 Err(poisoned) => *poisoned.into_inner() = Some(resource.clone()),
109 }
110 }
111}
112
113impl fmt::Debug for StackDriverExporter {
114 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115 #[allow(clippy::unneeded_field_pattern)]
116 let Self {
117 tx: _,
118 pending_count,
119 maximum_shutdown_duration,
120 resource: _,
121 } = self;
122 f.debug_struct("StackDriverExporter")
123 .field("tx", &"(elided)")
124 .field("pending_count", pending_count)
125 .field("maximum_shutdown_duration", maximum_shutdown_duration)
126 .finish()
127 }
128}
129
130#[derive(Clone, Default)]
132pub struct Builder {
133 maximum_shutdown_duration: Option<Duration>,
134 num_concurrent_requests: Option<usize>,
135 log_context: Option<LogContext>,
136}
137
138impl Builder {
139 pub fn maximum_shutdown_duration(mut self, duration: Duration) -> Self {
143 self.maximum_shutdown_duration = Some(duration);
144 self
145 }
146
147 pub fn num_concurrent_requests(mut self, num_concurrent_requests: usize) -> Self {
151 self.num_concurrent_requests = Some(num_concurrent_requests);
152 self
153 }
154
155 pub fn log_context(mut self, log_context: LogContext) -> Self {
157 self.log_context = Some(log_context);
158 self
159 }
160
161 pub async fn build<A: Authorizer>(
162 self,
163 authenticator: A,
164 ) -> Result<(StackDriverExporter, impl Future<Output = ()>), Error>
165 where
166 Error: From<A::Error>,
167 {
168 let Self {
169 maximum_shutdown_duration,
170 num_concurrent_requests,
171 log_context,
172 } = self;
173 let uri = http::uri::Uri::from_static("https://cloudtrace.googleapis.com:443");
174
175 #[cfg(all(feature = "tls-native-roots", not(feature = "tls-webpki-roots")))]
176 let tls_config = ClientTlsConfig::new().with_native_roots();
177 #[cfg(feature = "tls-webpki-roots")]
178 let tls_config = ClientTlsConfig::new().with_webpki_roots();
179 #[cfg(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")))]
180 let tls_config = ClientTlsConfig::new();
181
182 let trace_channel = Channel::builder(uri)
183 .tls_config(tls_config.clone())
184 .map_err(|e| Error::Transport(e.into()))?
185 .connect()
186 .await
187 .map_err(|e| Error::Transport(e.into()))?;
188
189 let log_client = match log_context {
190 Some(log_context) => {
191 let log_channel = Channel::builder(http::uri::Uri::from_static(
192 "https://logging.googleapis.com:443",
193 ))
194 .tls_config(tls_config)
195 .map_err(|e| Error::Transport(e.into()))?
196 .connect()
197 .await
198 .map_err(|e| Error::Transport(e.into()))?;
199
200 Some(LogClient {
201 client: LoggingServiceV2Client::new(log_channel),
202 context: Arc::new(InternalLogContext::from(log_context)),
203 })
204 }
205 None => None,
206 };
207
208 let (tx, rx) = futures_channel::mpsc::channel(64);
209 let pending_count = Arc::new(AtomicUsize::new(0));
210 let scopes = Arc::new(match log_client {
211 Some(_) => vec![TRACE_APPEND, LOGGING_WRITE],
212 None => vec![TRACE_APPEND],
213 });
214
215 let count_clone = pending_count.clone();
216 let resource = Arc::new(RwLock::new(None));
217 let ctx_resource = resource.clone();
218 let future = async move {
219 let trace_client = TraceServiceClient::new(trace_channel);
220 let authorizer = &authenticator;
221 let log_client = log_client.clone();
222 rx.for_each_concurrent(num_concurrent_requests, move |batch| {
223 let trace_client = trace_client.clone();
224 let log_client = log_client.clone();
225 let pending_count = count_clone.clone();
226 let scopes = scopes.clone();
227 let resource = ctx_resource.clone();
228 ExporterContext {
229 trace_client,
230 log_client,
231 authorizer,
232 pending_count,
233 scopes,
234 resource,
235 }
236 .export(batch)
237 })
238 .await
239 };
240
241 let exporter = StackDriverExporter {
242 tx,
243 pending_count,
244 maximum_shutdown_duration: maximum_shutdown_duration
245 .unwrap_or_else(|| Duration::from_secs(5)),
246 resource,
247 };
248
249 Ok((exporter, future))
250 }
251}
252
253struct ExporterContext<'a, A> {
254 trace_client: TraceServiceClient<Channel>,
255 log_client: Option<LogClient>,
256 authorizer: &'a A,
257 pending_count: Arc<AtomicUsize>,
258 scopes: Arc<Vec<&'static str>>,
259 resource: Arc<RwLock<Option<Resource>>>,
260}
261
262impl<A: Authorizer> ExporterContext<'_, A>
263where
264 Error: From<A::Error>,
265{
266 async fn export(mut self, batch: Vec<SpanData>) {
267 use proto::devtools::cloudtrace::v2::span::time_event::Value;
268
269 let mut entries = Vec::new();
270 let mut spans = Vec::with_capacity(batch.len());
271 for span in batch {
272 let trace_id = hex::encode(span.span_context.trace_id().to_bytes());
273 let span_id = hex::encode(span.span_context.span_id().to_bytes());
274 let time_event = match &self.log_client {
275 None => span
276 .events
277 .into_iter()
278 .map(|event| TimeEvent {
279 time: Some(event.timestamp.into()),
280 value: Some(Value::Annotation(Annotation {
281 description: Some(to_truncate(event.name.into_owned())),
282 ..Default::default()
283 })),
284 })
285 .collect(),
286 Some(client) => {
287 entries.extend(span.events.into_iter().map(|event| {
288 let (mut level, mut target, mut labels) =
289 (LogSeverity::Default, None, HashMap::default());
290 for kv in event.attributes {
291 match kv.key.as_str() {
292 "level" => {
293 level = match kv.value.as_str().as_ref() {
294 "DEBUG" | "TRACE" => LogSeverity::Debug,
295 "INFO" => LogSeverity::Info,
296 "WARN" => LogSeverity::Warning,
297 "ERROR" => LogSeverity::Error,
298 _ => LogSeverity::Default, }
300 }
301 "target" => target = Some(kv.value.as_str().into_owned()),
302 key => {
303 labels.insert(key.to_owned(), kv.value.as_str().into_owned());
304 }
305 }
306 }
307 let project_id = self.authorizer.project_id();
308 let log_id = &client.context.log_id;
309 LogEntry {
310 log_name: format!("projects/{project_id}/logs/{log_id}"),
311 resource: Some(client.context.resource.clone()),
312 severity: level as i32,
313 timestamp: Some(event.timestamp.into()),
314 labels,
315 trace: format!("projects/{project_id}/traces/{trace_id}"),
316 span_id: span_id.clone(),
317 source_location: target.map(|target| LogEntrySourceLocation {
318 file: String::new(),
319 line: 0,
320 function: target,
321 }),
322 payload: Some(Payload::TextPayload(event.name.into_owned())),
323 ..Default::default()
325 }
326 }));
327
328 vec![]
329 }
330 };
331
332 let resource = self.resource.read().ok();
333 let attributes = match resource {
334 Some(resource) => Attributes::new(span.attributes, resource.as_ref()),
335 None => Attributes::new(span.attributes, None),
336 };
337
338 spans.push(Span {
339 name: format!(
340 "projects/{}/traces/{}/spans/{}",
341 self.authorizer.project_id(),
342 hex::encode(span.span_context.trace_id().to_bytes()),
343 hex::encode(span.span_context.span_id().to_bytes())
344 ),
345 display_name: Some(to_truncate(span.name.into_owned())),
346 span_id: hex::encode(span.span_context.span_id().to_bytes()),
347 parent_span_id: match span.parent_span_id {
350 SpanId::INVALID => "".to_owned(),
351 _ => hex::encode(span.parent_span_id.to_bytes()),
352 },
353 start_time: Some(span.start_time.into()),
354 end_time: Some(span.end_time.into()),
355 attributes: Some(attributes),
356 time_events: Some(TimeEvents {
357 time_event,
358 ..Default::default()
359 }),
360 links: transform_links(&span.links),
361 status: status(span.status),
362 span_kind: SpanKind::from(span.span_kind) as i32,
363 ..Default::default()
364 });
365 }
366
367 let mut req = Request::new(BatchWriteSpansRequest {
368 name: format!("projects/{}", self.authorizer.project_id()),
369 spans,
370 });
371
372 self.pending_count.fetch_sub(1, Ordering::Relaxed);
373 if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
374 otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", e));
375 } else if let Err(e) = self.trace_client.batch_write_spans(req).await {
376 otel_error!(name: "ExportTransportError", error = format!("{:?}", e));
377 }
378
379 let client = match &mut self.log_client {
380 Some(client) => client,
381 None => return,
382 };
383
384 let mut req = Request::new(WriteLogEntriesRequest {
385 log_name: format!(
386 "projects/{}/logs/{}",
387 self.authorizer.project_id(),
388 client.context.log_id,
389 ),
390 entries,
391 dry_run: false,
392 labels: HashMap::default(),
393 partial_success: true,
394 resource: None,
395 });
396
397 if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
398 otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", e));
399 } else if let Err(e) = client.client.write_log_entries(req).await {
400 otel_error!(name: "ExportTransportError", error = format!("{:?}", e));
401 }
402 }
403}
404
405#[cfg(feature = "gcp-authorizer")]
406pub struct GcpAuthorizer {
407 provider: Arc<dyn gcp_auth::TokenProvider>,
408 project_id: Arc<str>,
409}
410
411#[cfg(feature = "gcp-authorizer")]
412impl GcpAuthorizer {
413 pub async fn new() -> Result<Self, Error> {
414 let provider = gcp_auth::provider()
415 .await
416 .map_err(|e| Error::Authorizer(e.into()))?;
417
418 let project_id = provider
419 .project_id()
420 .await
421 .map_err(|e| Error::Authorizer(e.into()))?;
422
423 Ok(Self {
424 provider,
425 project_id,
426 })
427 }
428 pub fn from_gcp_auth(provider: Arc<dyn gcp_auth::TokenProvider>, project_id: Arc<str>) -> Self {
429 Self {
430 provider,
431 project_id,
432 }
433 }
434}
435
436#[cfg(feature = "gcp-authorizer")]
437impl Authorizer for GcpAuthorizer {
438 type Error = Error;
439
440 fn project_id(&self) -> &str {
441 &self.project_id
442 }
443
444 async fn authorize<T: Send + Sync>(
445 &self,
446 req: &mut Request<T>,
447 scopes: &[&str],
448 ) -> Result<(), Self::Error> {
449 let token = self
450 .provider
451 .token(scopes)
452 .await
453 .map_err(|e| Error::Authorizer(e.into()))?;
454
455 req.metadata_mut().insert(
456 "authorization",
457 MetadataValue::try_from(format!("Bearer {}", token.as_str())).unwrap(),
458 );
459
460 Ok(())
461 }
462}
463
464pub trait Authorizer: Sync + Send + 'static {
465 type Error: std::error::Error + fmt::Debug + Send + Sync;
466
467 fn project_id(&self) -> &str;
468 fn authorize<T: Send + Sync>(
469 &self,
470 request: &mut Request<T>,
471 scopes: &[&str],
472 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
473}
474
475impl From<Value> for AttributeValue {
476 fn from(v: Value) -> AttributeValue {
477 use proto::devtools::cloudtrace::v2::attribute_value;
478 let new_value = match v {
479 Value::Bool(v) => attribute_value::Value::BoolValue(v),
480 Value::F64(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
481 Value::I64(v) => attribute_value::Value::IntValue(v),
482 Value::String(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
483 Value::Array(_) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
484 _ => attribute_value::Value::StringValue(to_truncate("".to_string())),
485 };
486 AttributeValue {
487 value: Some(new_value),
488 }
489 }
490}
491
492fn to_truncate(s: String) -> TruncatableString {
493 TruncatableString {
494 value: s,
495 ..Default::default()
496 }
497}
498
499#[derive(Debug, Error)]
500pub enum Error {
501 #[error("authorizer error: {0}")]
502 Authorizer(#[source] Box<dyn std::error::Error + Send + Sync>),
503 #[error("I/O error: {0}")]
504 Io(#[from] std::io::Error),
505 #[error("{0}")]
506 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
507 #[error("tonic error: {0}")]
508 Transport(#[source] Box<dyn std::error::Error + Send + Sync>),
509}
510
511impl opentelemetry_sdk::ExportError for Error {
512 fn exporter_name(&self) -> &'static str {
513 "stackdriver"
514 }
515}
516
517enum LogSeverity {
519 Default = 0,
520 Debug = 100,
521 Info = 200,
522 Warning = 400,
523 Error = 500,
524}
525
526#[derive(Clone)]
527struct LogClient {
528 client: LoggingServiceV2Client<Channel>,
529 context: Arc<InternalLogContext>,
530}
531
532struct InternalLogContext {
533 log_id: String,
534 resource: proto::api::MonitoredResource,
535}
536
537#[derive(Clone)]
538pub struct LogContext {
539 pub log_id: String,
540 pub resource: MonitoredResource,
541}
542
543impl From<LogContext> for InternalLogContext {
544 fn from(cx: LogContext) -> Self {
545 let mut labels = HashMap::default();
546 let resource = match cx.resource {
547 MonitoredResource::AppEngine {
548 project_id,
549 module_id,
550 version_id,
551 zone,
552 } => {
553 labels.insert("project_id".to_string(), project_id);
554 if let Some(module_id) = module_id {
555 labels.insert("module_id".to_string(), module_id);
556 }
557 if let Some(version_id) = version_id {
558 labels.insert("version_id".to_string(), version_id);
559 }
560 if let Some(zone) = zone {
561 labels.insert("zone".to_string(), zone);
562 }
563
564 proto::api::MonitoredResource {
565 r#type: "gae_app".to_owned(),
566 labels,
567 }
568 }
569 MonitoredResource::CloudFunction {
570 project_id,
571 function_name,
572 region,
573 } => {
574 labels.insert("project_id".to_string(), project_id);
575 if let Some(function_name) = function_name {
576 labels.insert("function_name".to_string(), function_name);
577 }
578 if let Some(region) = region {
579 labels.insert("region".to_string(), region);
580 }
581
582 proto::api::MonitoredResource {
583 r#type: "cloud_function".to_owned(),
584 labels,
585 }
586 }
587 MonitoredResource::CloudRunJob {
588 project_id,
589 job_name,
590 location,
591 } => {
592 labels.insert("project_id".to_string(), project_id);
593 if let Some(job_name) = job_name {
594 labels.insert("job_name".to_string(), job_name);
595 }
596 if let Some(location) = location {
597 labels.insert("location".to_string(), location);
598 }
599
600 proto::api::MonitoredResource {
601 r#type: "cloud_run_job".to_owned(),
602 labels,
603 }
604 }
605 MonitoredResource::CloudRunRevision {
606 project_id,
607 service_name,
608 revision_name,
609 location,
610 configuration_name,
611 } => {
612 labels.insert("project_id".to_string(), project_id);
613 if let Some(service_name) = service_name {
614 labels.insert("service_name".to_string(), service_name);
615 }
616 if let Some(revision_name) = revision_name {
617 labels.insert("revision_name".to_string(), revision_name);
618 }
619 if let Some(location) = location {
620 labels.insert("location".to_string(), location);
621 }
622 if let Some(configuration_name) = configuration_name {
623 labels.insert("configuration_name".to_string(), configuration_name);
624 }
625
626 proto::api::MonitoredResource {
627 r#type: "cloud_run_revision".to_owned(),
628 labels,
629 }
630 }
631
632 MonitoredResource::ComputeEngine {
633 project_id,
634 instance_id,
635 zone,
636 } => {
637 labels.insert("project_id".to_string(), project_id);
638 if let Some(instance_id) = instance_id {
639 labels.insert("instance_id".to_string(), instance_id);
640 }
641 if let Some(zone) = zone {
642 labels.insert("zone".to_string(), zone);
643 }
644
645 proto::api::MonitoredResource {
646 r#type: "gce_instance".to_owned(),
647 labels,
648 }
649 }
650
651 MonitoredResource::GenericNode {
652 project_id,
653 location,
654 namespace,
655 node_id,
656 } => {
657 labels.insert("project_id".to_string(), project_id);
658 if let Some(location) = location {
659 labels.insert("location".to_string(), location);
660 }
661 if let Some(namespace) = namespace {
662 labels.insert("namespace".to_string(), namespace);
663 }
664 if let Some(node_id) = node_id {
665 labels.insert("node_id".to_string(), node_id);
666 }
667
668 proto::api::MonitoredResource {
669 r#type: "generic_node".to_owned(),
670 labels,
671 }
672 }
673 MonitoredResource::GenericTask {
674 project_id,
675 location,
676 namespace,
677 job,
678 task_id,
679 } => {
680 labels.insert("project_id".to_owned(), project_id);
681 if let Some(location) = location {
682 labels.insert("location".to_owned(), location);
683 }
684 if let Some(namespace) = namespace {
685 labels.insert("namespace".to_owned(), namespace);
686 }
687 if let Some(job) = job {
688 labels.insert("job".to_owned(), job);
689 }
690 if let Some(task_id) = task_id {
691 labels.insert("task_id".to_owned(), task_id);
692 }
693
694 proto::api::MonitoredResource {
695 r#type: "generic_task".to_owned(),
696 labels,
697 }
698 }
699 MonitoredResource::Global { project_id } => {
700 labels.insert("project_id".to_owned(), project_id);
701 proto::api::MonitoredResource {
702 r#type: "global".to_owned(),
703 labels,
704 }
705 }
706 MonitoredResource::KubernetesEngine {
707 project_id,
708 cluster_name,
709 location,
710 pod_name,
711 namespace_name,
712 container_name,
713 } => {
714 labels.insert("project_id".to_string(), project_id);
715 if let Some(cluster_name) = cluster_name {
716 labels.insert("cluster_name".to_string(), cluster_name);
717 }
718 if let Some(location) = location {
719 labels.insert("location".to_string(), location);
720 }
721 if let Some(pod_name) = pod_name {
722 labels.insert("pod_name".to_string(), pod_name);
723 }
724 if let Some(namespace_name) = namespace_name {
725 labels.insert("namespace_name".to_string(), namespace_name);
726 }
727 if let Some(container_name) = container_name {
728 labels.insert("container_name".to_string(), container_name);
729 }
730
731 proto::api::MonitoredResource {
732 r#type: "k8s_container".to_owned(),
733 labels,
734 }
735 }
736 };
737
738 Self {
739 log_id: cx.log_id,
740 resource,
741 }
742 }
743}
744
745#[derive(Clone)]
750pub enum MonitoredResource {
751 AppEngine {
752 project_id: String,
753 module_id: Option<String>,
754 version_id: Option<String>,
755 zone: Option<String>,
756 },
757 CloudFunction {
758 project_id: String,
759 function_name: Option<String>,
760 region: Option<String>,
761 },
762 CloudRunJob {
763 project_id: String,
764 job_name: Option<String>,
765 location: Option<String>,
766 },
767 CloudRunRevision {
768 project_id: String,
769 service_name: Option<String>,
770 revision_name: Option<String>,
771 location: Option<String>,
772 configuration_name: Option<String>,
773 },
774 ComputeEngine {
775 project_id: String,
776 instance_id: Option<String>,
777 zone: Option<String>,
778 },
779 KubernetesEngine {
780 project_id: String,
781 location: Option<String>,
782 cluster_name: Option<String>,
783 namespace_name: Option<String>,
784 pod_name: Option<String>,
785 container_name: Option<String>,
786 },
787 GenericNode {
788 project_id: String,
789 location: Option<String>,
790 namespace: Option<String>,
791 node_id: Option<String>,
792 },
793 GenericTask {
794 project_id: String,
795 location: Option<String>,
796 namespace: Option<String>,
797 job: Option<String>,
798 task_id: Option<String>,
799 },
800 Global {
801 project_id: String,
802 },
803}
804
805impl Attributes {
806 fn new(attributes: Vec<KeyValue>, resource: Option<&Resource>) -> Self {
810 let mut new = Self {
811 dropped_attributes_count: 0,
812 attribute_map: HashMap::with_capacity(Ord::min(
813 MAX_ATTRIBUTES_PER_SPAN,
814 attributes.len() + resource.map_or(0, |r| r.len()),
815 )),
816 };
817
818 if let Some(resource) = resource {
819 for (k, v) in resource.iter() {
820 new.push(Cow::Borrowed(k), Cow::Borrowed(v));
821 }
822 }
823
824 for kv in attributes {
825 new.push(Cow::Owned(kv.key), Cow::Owned(kv.value));
826 }
827
828 new
829 }
830
831 fn push(&mut self, key: Cow<'_, Key>, value: Cow<'_, Value>) {
832 if self.attribute_map.len() >= MAX_ATTRIBUTES_PER_SPAN {
833 self.dropped_attributes_count += 1;
834 return;
835 }
836
837 let key_str = key.as_str();
838 if key_str.len() > 128 {
839 self.dropped_attributes_count += 1;
840 return;
841 }
842
843 for (otel_key, gcp_key) in KEY_MAP {
844 if otel_key == key_str {
845 self.attribute_map
846 .insert(gcp_key.to_owned(), value.into_owned().into());
847 return;
848 }
849 }
850
851 self.attribute_map.insert(
852 match key {
853 Cow::Owned(k) => k.to_string(),
854 Cow::Borrowed(k) => k.to_string(),
855 },
856 value.into_owned().into(),
857 );
858 }
859}
860
861fn transform_links(links: &opentelemetry_sdk::trace::SpanLinks) -> Option<Links> {
862 if links.is_empty() {
863 return None;
864 }
865
866 Some(Links {
867 dropped_links_count: links.dropped_count as i32,
868 link: links
869 .iter()
870 .map(|link| Link {
871 trace_id: hex::encode(link.span_context.trace_id().to_bytes()),
872 span_id: hex::encode(link.span_context.span_id().to_bytes()),
873 ..Default::default()
874 })
875 .collect(),
876 })
877}
878
879const KEY_MAP: [(&str, &str); 19] = [
883 (HTTP_PATH, GCP_HTTP_PATH),
884 (semconv::attribute::HTTP_HOST, "/http/host"),
885 ("http.request.header.host", "/http/host"),
886 (semconv::attribute::HTTP_METHOD, "/http/method"),
887 (semconv::attribute::HTTP_REQUEST_METHOD, "/http/method"),
888 (semconv::attribute::HTTP_TARGET, "/http/path"),
889 (semconv::attribute::URL_PATH, "/http/path"),
890 (semconv::attribute::HTTP_URL, "/http/url"),
891 (semconv::attribute::URL_FULL, "/http/url"),
892 (semconv::attribute::HTTP_USER_AGENT, "/http/user_agent"),
893 (semconv::attribute::USER_AGENT_ORIGINAL, "/http/user_agent"),
894 (semconv::attribute::HTTP_STATUS_CODE, "/http/status_code"),
895 (
897 semconv::attribute::HTTP_RESPONSE_STATUS_CODE,
898 "/http/status_code",
899 ),
900 (
901 semconv::attribute::K8S_CLUSTER_NAME,
902 "g.co/r/k8s_container/cluster_name",
903 ),
904 (
905 semconv::attribute::K8S_NAMESPACE_NAME,
906 "g.co/r/k8s_container/namespace",
907 ),
908 (
909 semconv::attribute::K8S_POD_NAME,
910 "g.co/r/k8s_container/pod_name",
911 ),
912 (
913 semconv::attribute::K8S_CONTAINER_NAME,
914 "g.co/r/k8s_container/container_name",
915 ),
916 (semconv::trace::HTTP_ROUTE, "/http/route"),
917 (HTTP_PATH, GCP_HTTP_PATH),
918];
919
920const HTTP_PATH: &str = "http.path";
921const GCP_HTTP_PATH: &str = "/http/path";
922
923impl From<opentelemetry::trace::SpanKind> for SpanKind {
924 fn from(span_kind: opentelemetry::trace::SpanKind) -> Self {
925 match span_kind {
926 opentelemetry::trace::SpanKind::Client => SpanKind::Client,
927 opentelemetry::trace::SpanKind::Server => SpanKind::Server,
928 opentelemetry::trace::SpanKind::Producer => SpanKind::Producer,
929 opentelemetry::trace::SpanKind::Consumer => SpanKind::Consumer,
930 opentelemetry::trace::SpanKind::Internal => SpanKind::Internal,
931 }
932 }
933}
934
935fn status(value: opentelemetry::trace::Status) -> Option<Status> {
936 match value {
937 opentelemetry::trace::Status::Ok => Some(Status {
938 code: Code::Ok as i32,
939 message: "".to_owned(),
940 details: vec![],
941 }),
942 opentelemetry::trace::Status::Unset => None,
943 opentelemetry::trace::Status::Error { description } => Some(Status {
944 code: Code::Unknown as i32,
945 message: description.into(),
946 details: vec![],
947 }),
948 }
949}
950const TRACE_APPEND: &str = "https://www.googleapis.com/auth/trace.append";
951const LOGGING_WRITE: &str = "https://www.googleapis.com/auth/logging.write";
952const MAX_ATTRIBUTES_PER_SPAN: usize = 32;
953
954#[cfg(test)]
955mod tests {
956 use super::*;
957 use opentelemetry::{KeyValue, Value};
958 use opentelemetry_semantic_conventions as semcov;
959
960 #[test]
961 fn test_attributes_mapping() {
962 let capacity = 10;
963 let mut attributes = Vec::with_capacity(capacity);
964
965 attributes.push(KeyValue::new(
967 semconv::attribute::HTTP_HOST,
968 "example.com:8080",
969 ));
970
971 attributes.push(KeyValue::new(semcov::attribute::HTTP_METHOD, "POST"));
973
974 attributes.push(KeyValue::new(HTTP_PATH, "/path/12314/?q=ddds#123"));
976
977 attributes.push(KeyValue::new(
979 semcov::attribute::HTTP_URL,
980 "https://example.com:8080/webshop/articles/4?s=1",
981 ));
982
983 attributes.push(KeyValue::new(
985 semconv::attribute::HTTP_USER_AGENT,
986 "CERN-LineMode/2.15 libwww/2.17b3",
987 ));
988
989 attributes.push(KeyValue::new(semcov::attribute::HTTP_STATUS_CODE, 200i64));
991
992 attributes.push(KeyValue::new(
994 semcov::trace::HTTP_ROUTE,
995 "/webshop/articles/:article_id",
996 ));
997
998 let resources = Resource::builder_empty()
1000 .with_attributes([KeyValue::new(
1001 semcov::resource::SERVICE_NAME,
1002 "Test Service Name",
1003 )])
1004 .build();
1005
1006 let actual = Attributes::new(attributes, Some(&resources));
1007 assert_eq!(actual.attribute_map.len(), 8);
1008 assert_eq!(actual.dropped_attributes_count, 0);
1009 assert_eq!(
1010 actual.attribute_map.get("/http/host"),
1011 Some(&AttributeValue::from(Value::String(
1012 "example.com:8080".into()
1013 )))
1014 );
1015 assert_eq!(
1016 actual.attribute_map.get("/http/method"),
1017 Some(&AttributeValue::from(Value::String("POST".into()))),
1018 );
1019 assert_eq!(
1020 actual.attribute_map.get("/http/path"),
1021 Some(&AttributeValue::from(Value::String(
1022 "/path/12314/?q=ddds#123".into()
1023 ))),
1024 );
1025 assert_eq!(
1026 actual.attribute_map.get("/http/route"),
1027 Some(&AttributeValue::from(Value::String(
1028 "/webshop/articles/:article_id".into()
1029 ))),
1030 );
1031 assert_eq!(
1032 actual.attribute_map.get("/http/url"),
1033 Some(&AttributeValue::from(Value::String(
1034 "https://example.com:8080/webshop/articles/4?s=1".into(),
1035 ))),
1036 );
1037 assert_eq!(
1038 actual.attribute_map.get("/http/user_agent"),
1039 Some(&AttributeValue::from(Value::String(
1040 "CERN-LineMode/2.15 libwww/2.17b3".into()
1041 ))),
1042 );
1043 assert_eq!(
1044 actual.attribute_map.get("/http/status_code"),
1045 Some(&AttributeValue::from(Value::I64(200))),
1046 );
1047 }
1048
1049 #[test]
1050 fn test_too_many() {
1051 let resources = Resource::builder_empty()
1052 .with_attributes([KeyValue::new(
1053 semconv::attribute::USER_AGENT_ORIGINAL,
1054 "Test Service Name UA",
1055 )])
1056 .build();
1057 let mut attributes = Vec::with_capacity(32);
1058 for i in 0..32 {
1059 attributes.push(KeyValue::new(
1060 format!("key{}", i),
1061 Value::String(format!("value{}", i).into()),
1062 ));
1063 }
1064
1065 let actual = Attributes::new(attributes, Some(&resources));
1066 assert_eq!(actual.attribute_map.len(), 32);
1067 assert_eq!(actual.dropped_attributes_count, 1);
1068 assert_eq!(
1069 actual.attribute_map.get("/http/user_agent"),
1070 Some(&AttributeValue::from(Value::String(
1071 "Test Service Name UA".into()
1072 ))),
1073 );
1074 }
1075
1076 #[test]
1077 fn test_attributes_mapping_http_target() {
1078 let attributes = vec![KeyValue::new(
1079 semcov::attribute::HTTP_TARGET,
1080 "/path/12314/?q=ddds#123",
1081 )];
1082
1083 let resources = Resource::builder_empty().with_attributes([]).build();
1086 let actual = Attributes::new(attributes, Some(&resources));
1087 assert_eq!(actual.attribute_map.len(), 1);
1088 assert_eq!(actual.dropped_attributes_count, 0);
1089 assert_eq!(
1090 actual.attribute_map.get("/http/path"),
1091 Some(&AttributeValue::from(Value::String(
1092 "/path/12314/?q=ddds#123".into()
1093 ))),
1094 );
1095 }
1096
1097 #[test]
1098 fn test_attributes_mapping_dropped_attributes_count() {
1099 let attributes = vec![KeyValue::new("answer", Value::I64(42)),KeyValue::new("long_attribute_key_dvwmacxpeefbuemoxljmqvldjxmvvihoeqnuqdsyovwgljtnemouidabhkmvsnauwfnaihekcfwhugejboiyfthyhmkpsaxtidlsbwsmirebax", Value::String("Some value".into()))];
1100
1101 let resources = Resource::builder_empty().with_attributes([]).build();
1102 let actual = Attributes::new(attributes, Some(&resources));
1103 assert_eq!(
1104 actual,
1105 Attributes {
1106 attribute_map: HashMap::from([(
1107 "answer".into(),
1108 AttributeValue::from(Value::I64(42))
1109 ),]),
1110 dropped_attributes_count: 1,
1111 }
1112 );
1113 assert_eq!(actual.attribute_map.len(), 1);
1114 assert_eq!(actual.dropped_attributes_count, 1);
1115 }
1116}