1#![cfg(not(doctest))]
2#![allow(
7 clippy::doc_lazy_continuation,
8 deprecated,
9 rustdoc::bare_urls,
10 rustdoc::broken_intra_doc_links,
11 rustdoc::invalid_rust_codeblocks
12)]
13
14use std::{
15 borrow::Cow,
16 collections::HashMap,
17 fmt,
18 future::Future,
19 sync::{
20 atomic::{AtomicUsize, Ordering},
21 Arc, RwLock,
22 },
23 time::{Duration, Instant},
24};
25
26use async_trait::async_trait;
27use futures_core::future::BoxFuture;
28use futures_util::stream::StreamExt;
29use opentelemetry::{
30 otel_error,
31 trace::{SpanId, TraceError},
32 Key, KeyValue, Value,
33};
34use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
35use opentelemetry_sdk::{
36 trace::{SpanData, SpanExporter},
37 Resource,
38};
39use opentelemetry_semantic_conventions as semconv;
40use thiserror::Error;
41#[cfg(feature = "gcp-authorizer")]
42use tonic::metadata::MetadataValue;
43use tonic::{
44 transport::{Channel, ClientTlsConfig},
45 Code, Request,
46};
47
48#[allow(clippy::derive_partial_eq_without_eq)] pub mod proto;
50
51#[cfg(feature = "propagator")]
52pub mod google_trace_context_propagator;
53
54use proto::devtools::cloudtrace::v2::span::time_event::Annotation;
55use proto::devtools::cloudtrace::v2::span::{
56 Attributes, Link, Links, SpanKind, TimeEvent, TimeEvents,
57};
58use proto::devtools::cloudtrace::v2::trace_service_client::TraceServiceClient;
59use proto::devtools::cloudtrace::v2::{
60 AttributeValue, BatchWriteSpansRequest, Span, TruncatableString,
61};
62use proto::logging::v2::{
63 log_entry::Payload, logging_service_v2_client::LoggingServiceV2Client, LogEntry,
64 LogEntrySourceLocation, WriteLogEntriesRequest,
65};
66use proto::rpc::Status;
67
68#[derive(Clone)]
73pub struct StackDriverExporter {
74 tx: futures_channel::mpsc::Sender<Vec<SpanData>>,
75 pending_count: Arc<AtomicUsize>,
76 maximum_shutdown_duration: Duration,
77 resource: Arc<RwLock<Option<Resource>>>,
78}
79
80impl StackDriverExporter {
81 pub fn builder() -> Builder {
82 Builder::default()
83 }
84
85 pub fn pending_count(&self) -> usize {
86 self.pending_count.load(Ordering::Relaxed)
87 }
88}
89
90impl SpanExporter for StackDriverExporter {
91 fn export(&mut self, batch: Vec<SpanData>) -> BoxFuture<'static, OTelSdkResult> {
92 match self.tx.try_send(batch) {
93 Err(e) => Box::pin(std::future::ready(Err(OTelSdkError::InternalFailure(
94 format!("{:?}", e),
95 )))),
96 Ok(()) => {
97 self.pending_count.fetch_add(1, Ordering::Relaxed);
98 Box::pin(std::future::ready(Ok(())))
99 }
100 }
101 }
102
103 fn shutdown(&mut self) -> OTelSdkResult {
104 let start = Instant::now();
105 while (Instant::now() - start) < self.maximum_shutdown_duration && self.pending_count() > 0
106 {
107 std::thread::yield_now();
108 }
110 Ok(())
111 }
112
113 fn set_resource(&mut self, resource: &Resource) {
114 match self.resource.write() {
115 Ok(mut guard) => *guard = Some(resource.clone()),
116 Err(poisoned) => *poisoned.into_inner() = Some(resource.clone()),
117 }
118 }
119}
120
121impl fmt::Debug for StackDriverExporter {
122 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
123 #[allow(clippy::unneeded_field_pattern)]
124 let Self {
125 tx: _,
126 pending_count,
127 maximum_shutdown_duration,
128 resource: _,
129 } = self;
130 f.debug_struct("StackDriverExporter")
131 .field("tx", &"(elided)")
132 .field("pending_count", pending_count)
133 .field("maximum_shutdown_duration", maximum_shutdown_duration)
134 .finish()
135 }
136}
137
138#[derive(Clone, Default)]
140pub struct Builder {
141 maximum_shutdown_duration: Option<Duration>,
142 num_concurrent_requests: Option<usize>,
143 log_context: Option<LogContext>,
144}
145
146impl Builder {
147 pub fn maximum_shutdown_duration(mut self, duration: Duration) -> Self {
151 self.maximum_shutdown_duration = Some(duration);
152 self
153 }
154
155 pub fn num_concurrent_requests(mut self, num_concurrent_requests: usize) -> Self {
159 self.num_concurrent_requests = Some(num_concurrent_requests);
160 self
161 }
162
163 pub fn log_context(mut self, log_context: LogContext) -> Self {
165 self.log_context = Some(log_context);
166 self
167 }
168
169 pub async fn build<A: Authorizer>(
170 self,
171 authenticator: A,
172 ) -> Result<(StackDriverExporter, impl Future<Output = ()>), Error>
173 where
174 Error: From<A::Error>,
175 {
176 let Self {
177 maximum_shutdown_duration,
178 num_concurrent_requests,
179 log_context,
180 } = self;
181 let uri = http::uri::Uri::from_static("https://cloudtrace.googleapis.com:443");
182
183 #[cfg(all(feature = "tls-native-roots", not(feature = "tls-webpki-roots")))]
184 let tls_config = ClientTlsConfig::new().with_native_roots();
185 #[cfg(feature = "tls-webpki-roots")]
186 let tls_config = ClientTlsConfig::new().with_webpki_roots();
187 #[cfg(not(any(feature = "tls-native-roots", feature = "tls-webpki-roots")))]
188 let tls_config = ClientTlsConfig::new();
189
190 let trace_channel = Channel::builder(uri)
191 .tls_config(tls_config.clone())
192 .map_err(|e| Error::Transport(e.into()))?
193 .connect()
194 .await
195 .map_err(|e| Error::Transport(e.into()))?;
196
197 let log_client = match log_context {
198 Some(log_context) => {
199 let log_channel = Channel::builder(http::uri::Uri::from_static(
200 "https://logging.googleapis.com:443",
201 ))
202 .tls_config(tls_config)
203 .map_err(|e| Error::Transport(e.into()))?
204 .connect()
205 .await
206 .map_err(|e| Error::Transport(e.into()))?;
207
208 Some(LogClient {
209 client: LoggingServiceV2Client::new(log_channel),
210 context: Arc::new(InternalLogContext::from(log_context)),
211 })
212 }
213 None => None,
214 };
215
216 let (tx, rx) = futures_channel::mpsc::channel(64);
217 let pending_count = Arc::new(AtomicUsize::new(0));
218 let scopes = Arc::new(match log_client {
219 Some(_) => vec![TRACE_APPEND, LOGGING_WRITE],
220 None => vec![TRACE_APPEND],
221 });
222
223 let count_clone = pending_count.clone();
224 let resource = Arc::new(RwLock::new(None));
225 let ctx_resource = resource.clone();
226 let future = async move {
227 let trace_client = TraceServiceClient::new(trace_channel);
228 let authorizer = &authenticator;
229 let log_client = log_client.clone();
230 rx.for_each_concurrent(num_concurrent_requests, move |batch| {
231 let trace_client = trace_client.clone();
232 let log_client = log_client.clone();
233 let pending_count = count_clone.clone();
234 let scopes = scopes.clone();
235 let resource = ctx_resource.clone();
236 ExporterContext {
237 trace_client,
238 log_client,
239 authorizer,
240 pending_count,
241 scopes,
242 resource,
243 }
244 .export(batch)
245 })
246 .await
247 };
248
249 let exporter = StackDriverExporter {
250 tx,
251 pending_count,
252 maximum_shutdown_duration: maximum_shutdown_duration
253 .unwrap_or_else(|| Duration::from_secs(5)),
254 resource,
255 };
256
257 Ok((exporter, future))
258 }
259}
260
261struct ExporterContext<'a, A> {
262 trace_client: TraceServiceClient<Channel>,
263 log_client: Option<LogClient>,
264 authorizer: &'a A,
265 pending_count: Arc<AtomicUsize>,
266 scopes: Arc<Vec<&'static str>>,
267 resource: Arc<RwLock<Option<Resource>>>,
268}
269
270impl<A: Authorizer> ExporterContext<'_, A>
271where
272 Error: From<A::Error>,
273{
274 async fn export(mut self, batch: Vec<SpanData>) {
275 use proto::devtools::cloudtrace::v2::span::time_event::Value;
276
277 let mut entries = Vec::new();
278 let mut spans = Vec::with_capacity(batch.len());
279 for span in batch {
280 let trace_id = hex::encode(span.span_context.trace_id().to_bytes());
281 let span_id = hex::encode(span.span_context.span_id().to_bytes());
282 let time_event = match &self.log_client {
283 None => span
284 .events
285 .into_iter()
286 .map(|event| TimeEvent {
287 time: Some(event.timestamp.into()),
288 value: Some(Value::Annotation(Annotation {
289 description: Some(to_truncate(event.name.into_owned())),
290 ..Default::default()
291 })),
292 })
293 .collect(),
294 Some(client) => {
295 entries.extend(span.events.into_iter().map(|event| {
296 let (mut level, mut target, mut labels) =
297 (LogSeverity::Default, None, HashMap::default());
298 for kv in event.attributes {
299 match kv.key.as_str() {
300 "level" => {
301 level = match kv.value.as_str().as_ref() {
302 "DEBUG" | "TRACE" => LogSeverity::Debug,
303 "INFO" => LogSeverity::Info,
304 "WARN" => LogSeverity::Warning,
305 "ERROR" => LogSeverity::Error,
306 _ => LogSeverity::Default, }
308 }
309 "target" => target = Some(kv.value.as_str().into_owned()),
310 key => {
311 labels.insert(key.to_owned(), kv.value.as_str().into_owned());
312 }
313 }
314 }
315 let project_id = self.authorizer.project_id();
316 let log_id = &client.context.log_id;
317 LogEntry {
318 log_name: format!("projects/{project_id}/logs/{log_id}"),
319 resource: Some(client.context.resource.clone()),
320 severity: level as i32,
321 timestamp: Some(event.timestamp.into()),
322 labels,
323 trace: format!("projects/{project_id}/traces/{trace_id}"),
324 span_id: span_id.clone(),
325 source_location: target.map(|target| LogEntrySourceLocation {
326 file: String::new(),
327 line: 0,
328 function: target,
329 }),
330 payload: Some(Payload::TextPayload(event.name.into_owned())),
331 ..Default::default()
333 }
334 }));
335
336 vec![]
337 }
338 };
339
340 let resource = self.resource.read().ok();
341 let attributes = match resource {
342 Some(resource) => Attributes::new(span.attributes, resource.as_ref()),
343 None => Attributes::new(span.attributes, None),
344 };
345
346 spans.push(Span {
347 name: format!(
348 "projects/{}/traces/{}/spans/{}",
349 self.authorizer.project_id(),
350 hex::encode(span.span_context.trace_id().to_bytes()),
351 hex::encode(span.span_context.span_id().to_bytes())
352 ),
353 display_name: Some(to_truncate(span.name.into_owned())),
354 span_id: hex::encode(span.span_context.span_id().to_bytes()),
355 parent_span_id: match span.parent_span_id {
358 SpanId::INVALID => "".to_owned(),
359 _ => hex::encode(span.parent_span_id.to_bytes()),
360 },
361 start_time: Some(span.start_time.into()),
362 end_time: Some(span.end_time.into()),
363 attributes: Some(attributes),
364 time_events: Some(TimeEvents {
365 time_event,
366 ..Default::default()
367 }),
368 links: transform_links(&span.links),
369 status: status(span.status),
370 span_kind: SpanKind::from(span.span_kind) as i32,
371 ..Default::default()
372 });
373 }
374
375 let mut req = Request::new(BatchWriteSpansRequest {
376 name: format!("projects/{}", self.authorizer.project_id()),
377 spans,
378 });
379
380 self.pending_count.fetch_sub(1, Ordering::Relaxed);
381 if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
382 otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", TraceError::from(Error::Authorizer(e.into()))));
383 } else if let Err(e) = self.trace_client.batch_write_spans(req).await {
384 otel_error!(name: "ExportTransportError", error = format!("{:?}", TraceError::from(Error::Transport(e.into()))));
385 }
386
387 let client = match &mut self.log_client {
388 Some(client) => client,
389 None => return,
390 };
391
392 let mut req = Request::new(WriteLogEntriesRequest {
393 log_name: format!(
394 "projects/{}/logs/{}",
395 self.authorizer.project_id(),
396 client.context.log_id,
397 ),
398 entries,
399 dry_run: false,
400 labels: HashMap::default(),
401 partial_success: true,
402 resource: None,
403 });
404
405 if let Err(e) = self.authorizer.authorize(&mut req, &self.scopes).await {
406 otel_error!(name: "ExportAuthorizeError", error = format!("{:?}", TraceError::from(Error::Authorizer(e.into()))));
407 } else if let Err(e) = client.client.write_log_entries(req).await {
408 otel_error!(name: "ExportTransportError", error = format!("{:?}", TraceError::from(Error::Transport(e.into()))));
409 }
410 }
411}
412
413#[cfg(feature = "gcp-authorizer")]
414pub struct GcpAuthorizer {
415 provider: Arc<dyn gcp_auth::TokenProvider>,
416 project_id: Arc<str>,
417}
418
419#[cfg(feature = "gcp-authorizer")]
420impl GcpAuthorizer {
421 pub async fn new() -> Result<Self, Error> {
422 let provider = gcp_auth::provider()
423 .await
424 .map_err(|e| Error::Authorizer(e.into()))?;
425
426 let project_id = provider
427 .project_id()
428 .await
429 .map_err(|e| Error::Authorizer(e.into()))?;
430
431 Ok(Self {
432 provider,
433 project_id,
434 })
435 }
436 pub fn from_gcp_auth(provider: Arc<dyn gcp_auth::TokenProvider>, project_id: Arc<str>) -> Self {
437 Self {
438 provider,
439 project_id,
440 }
441 }
442}
443
444#[cfg(feature = "gcp-authorizer")]
445#[async_trait]
446impl Authorizer for GcpAuthorizer {
447 type Error = Error;
448
449 fn project_id(&self) -> &str {
450 &self.project_id
451 }
452
453 async fn authorize<T: Send + Sync>(
454 &self,
455 req: &mut Request<T>,
456 scopes: &[&str],
457 ) -> Result<(), Self::Error> {
458 let token = self
459 .provider
460 .token(scopes)
461 .await
462 .map_err(|e| Error::Authorizer(e.into()))?;
463
464 req.metadata_mut().insert(
465 "authorization",
466 MetadataValue::try_from(format!("Bearer {}", token.as_str())).unwrap(),
467 );
468
469 Ok(())
470 }
471}
472
473#[async_trait]
474pub trait Authorizer: Sync + Send + 'static {
475 type Error: std::error::Error + fmt::Debug + Send + Sync;
476
477 fn project_id(&self) -> &str;
478 async fn authorize<T: Send + Sync>(
479 &self,
480 request: &mut Request<T>,
481 scopes: &[&str],
482 ) -> Result<(), Self::Error>;
483}
484
485impl From<Value> for AttributeValue {
486 fn from(v: Value) -> AttributeValue {
487 use proto::devtools::cloudtrace::v2::attribute_value;
488 let new_value = match v {
489 Value::Bool(v) => attribute_value::Value::BoolValue(v),
490 Value::F64(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
491 Value::I64(v) => attribute_value::Value::IntValue(v),
492 Value::String(v) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
493 Value::Array(_) => attribute_value::Value::StringValue(to_truncate(v.to_string())),
494 _ => attribute_value::Value::StringValue(to_truncate("".to_string())),
495 };
496 AttributeValue {
497 value: Some(new_value),
498 }
499 }
500}
501
502fn to_truncate(s: String) -> TruncatableString {
503 TruncatableString {
504 value: s,
505 ..Default::default()
506 }
507}
508
509#[derive(Debug, Error)]
510pub enum Error {
511 #[error("authorizer error: {0}")]
512 Authorizer(#[source] Box<dyn std::error::Error + Send + Sync>),
513 #[error("I/O error: {0}")]
514 Io(#[from] std::io::Error),
515 #[error("{0}")]
516 Other(#[from] Box<dyn std::error::Error + Send + Sync>),
517 #[error("tonic error: {0}")]
518 Transport(#[source] Box<dyn std::error::Error + Send + Sync>),
519}
520
521impl opentelemetry::trace::ExportError for Error {
522 fn exporter_name(&self) -> &'static str {
523 "stackdriver"
524 }
525}
526
527enum LogSeverity {
529 Default = 0,
530 Debug = 100,
531 Info = 200,
532 Warning = 400,
533 Error = 500,
534}
535
536#[derive(Clone)]
537struct LogClient {
538 client: LoggingServiceV2Client<Channel>,
539 context: Arc<InternalLogContext>,
540}
541
542struct InternalLogContext {
543 log_id: String,
544 resource: proto::api::MonitoredResource,
545}
546
547#[derive(Clone)]
548pub struct LogContext {
549 pub log_id: String,
550 pub resource: MonitoredResource,
551}
552
553impl From<LogContext> for InternalLogContext {
554 fn from(cx: LogContext) -> Self {
555 let mut labels = HashMap::default();
556 let resource = match cx.resource {
557 MonitoredResource::CloudRunJob {
558 project_id,
559 job_name,
560 location,
561 } => {
562 labels.insert("project_id".to_string(), project_id);
563 if let Some(job_name) = job_name {
564 labels.insert("job_name".to_string(), job_name);
565 }
566 if let Some(location) = location {
567 labels.insert("location".to_string(), location);
568 }
569
570 proto::api::MonitoredResource {
571 r#type: "cloud_run_job".to_owned(),
572 labels,
573 }
574 }
575 MonitoredResource::CloudRunRevision {
576 project_id,
577 service_name,
578 revision_name,
579 location,
580 configuration_name,
581 } => {
582 labels.insert("project_id".to_string(), project_id);
583 if let Some(service_name) = service_name {
584 labels.insert("service_name".to_string(), service_name);
585 }
586 if let Some(revision_name) = revision_name {
587 labels.insert("revision_name".to_string(), revision_name);
588 }
589 if let Some(location) = location {
590 labels.insert("location".to_string(), location);
591 }
592 if let Some(configuration_name) = configuration_name {
593 labels.insert("configuration_name".to_string(), configuration_name);
594 }
595
596 proto::api::MonitoredResource {
597 r#type: "cloud_run_revision".to_owned(),
598 labels,
599 }
600 }
601 MonitoredResource::GenericNode {
602 project_id,
603 location,
604 namespace,
605 node_id,
606 } => {
607 labels.insert("project_id".to_string(), project_id);
608 if let Some(location) = location {
609 labels.insert("location".to_string(), location);
610 }
611 if let Some(namespace) = namespace {
612 labels.insert("namespace".to_string(), namespace);
613 }
614 if let Some(node_id) = node_id {
615 labels.insert("node_id".to_string(), node_id);
616 }
617
618 proto::api::MonitoredResource {
619 r#type: "generic_node".to_owned(),
620 labels,
621 }
622 }
623 MonitoredResource::GenericTask {
624 project_id,
625 location,
626 namespace,
627 job,
628 task_id,
629 } => {
630 labels.insert("project_id".to_owned(), project_id);
631 if let Some(location) = location {
632 labels.insert("location".to_owned(), location);
633 }
634 if let Some(namespace) = namespace {
635 labels.insert("namespace".to_owned(), namespace);
636 }
637 if let Some(job) = job {
638 labels.insert("job".to_owned(), job);
639 }
640 if let Some(task_id) = task_id {
641 labels.insert("task_id".to_owned(), task_id);
642 }
643
644 proto::api::MonitoredResource {
645 r#type: "generic_task".to_owned(),
646 labels,
647 }
648 }
649 MonitoredResource::Global { project_id } => {
650 labels.insert("project_id".to_owned(), project_id);
651 proto::api::MonitoredResource {
652 r#type: "global".to_owned(),
653 labels,
654 }
655 }
656 };
657
658 Self {
659 log_id: cx.log_id,
660 resource,
661 }
662 }
663}
664
665#[derive(Clone)]
670pub enum MonitoredResource {
671 Global {
672 project_id: String,
673 },
674 GenericNode {
675 project_id: String,
676 location: Option<String>,
677 namespace: Option<String>,
678 node_id: Option<String>,
679 },
680 GenericTask {
681 project_id: String,
682 location: Option<String>,
683 namespace: Option<String>,
684 job: Option<String>,
685 task_id: Option<String>,
686 },
687 CloudRunJob {
688 project_id: String,
689 job_name: Option<String>,
690 location: Option<String>,
691 },
692 CloudRunRevision {
693 project_id: String,
694 service_name: Option<String>,
695 revision_name: Option<String>,
696 location: Option<String>,
697 configuration_name: Option<String>,
698 },
699}
700
701impl Attributes {
702 fn new(attributes: Vec<KeyValue>, resource: Option<&Resource>) -> Self {
706 let mut new = Self {
707 dropped_attributes_count: 0,
708 attribute_map: HashMap::with_capacity(Ord::min(
709 MAX_ATTRIBUTES_PER_SPAN,
710 attributes.len() + resource.map_or(0, |r| r.len()),
711 )),
712 };
713
714 if let Some(resource) = resource {
715 for (k, v) in resource.iter() {
716 new.push(Cow::Borrowed(k), Cow::Borrowed(v));
717 }
718 }
719
720 for kv in attributes {
721 new.push(Cow::Owned(kv.key), Cow::Owned(kv.value));
722 }
723
724 new
725 }
726
727 fn push(&mut self, key: Cow<'_, Key>, value: Cow<'_, Value>) {
728 if self.attribute_map.len() >= MAX_ATTRIBUTES_PER_SPAN {
729 self.dropped_attributes_count += 1;
730 return;
731 }
732
733 let key_str = key.as_str();
734 if key_str.len() > 128 {
735 self.dropped_attributes_count += 1;
736 return;
737 }
738
739 for (otel_key, gcp_key) in KEY_MAP {
740 if otel_key == key_str {
741 self.attribute_map
742 .insert(gcp_key.to_owned(), value.into_owned().into());
743 return;
744 }
745 }
746
747 self.attribute_map.insert(
748 match key {
749 Cow::Owned(k) => k.to_string(),
750 Cow::Borrowed(k) => k.to_string(),
751 },
752 value.into_owned().into(),
753 );
754 }
755}
756
757fn transform_links(links: &opentelemetry_sdk::trace::SpanLinks) -> Option<Links> {
758 if links.is_empty() {
759 return None;
760 }
761
762 Some(Links {
763 dropped_links_count: links.dropped_count as i32,
764 link: links
765 .iter()
766 .map(|link| Link {
767 trace_id: hex::encode(link.span_context.trace_id().to_bytes()),
768 span_id: hex::encode(link.span_context.span_id().to_bytes()),
769 ..Default::default()
770 })
771 .collect(),
772 })
773}
774
775const KEY_MAP: [(&str, &str); 19] = [
779 (HTTP_PATH, GCP_HTTP_PATH),
780 (semconv::attribute::HTTP_HOST, "/http/host"),
781 ("http.request.header.host", "/http/host"),
782 (semconv::attribute::HTTP_METHOD, "/http/method"),
783 (semconv::attribute::HTTP_REQUEST_METHOD, "/http/method"),
784 (semconv::attribute::HTTP_TARGET, "/http/path"),
785 (semconv::attribute::URL_PATH, "/http/path"),
786 (semconv::attribute::HTTP_URL, "/http/url"),
787 (semconv::attribute::URL_FULL, "/http/url"),
788 (semconv::attribute::HTTP_USER_AGENT, "/http/user_agent"),
789 (semconv::attribute::USER_AGENT_ORIGINAL, "/http/user_agent"),
790 (semconv::attribute::HTTP_STATUS_CODE, "/http/status_code"),
791 (
793 semconv::attribute::HTTP_RESPONSE_STATUS_CODE,
794 "/http/status_code",
795 ),
796 (
797 semconv::attribute::K8S_CLUSTER_NAME,
798 "g.co/r/k8s_container/cluster_name",
799 ),
800 (
801 semconv::attribute::K8S_NAMESPACE_NAME,
802 "g.co/r/k8s_container/namespace",
803 ),
804 (
805 semconv::attribute::K8S_POD_NAME,
806 "g.co/r/k8s_container/pod_name",
807 ),
808 (
809 semconv::attribute::K8S_CONTAINER_NAME,
810 "g.co/r/k8s_container/container_name",
811 ),
812 (semconv::trace::HTTP_ROUTE, "/http/route"),
813 (HTTP_PATH, GCP_HTTP_PATH),
814];
815
816const HTTP_PATH: &str = "http.path";
817const GCP_HTTP_PATH: &str = "/http/path";
818
819impl From<opentelemetry::trace::SpanKind> for SpanKind {
820 fn from(span_kind: opentelemetry::trace::SpanKind) -> Self {
821 match span_kind {
822 opentelemetry::trace::SpanKind::Client => SpanKind::Client,
823 opentelemetry::trace::SpanKind::Server => SpanKind::Server,
824 opentelemetry::trace::SpanKind::Producer => SpanKind::Producer,
825 opentelemetry::trace::SpanKind::Consumer => SpanKind::Consumer,
826 opentelemetry::trace::SpanKind::Internal => SpanKind::Internal,
827 }
828 }
829}
830
831fn status(value: opentelemetry::trace::Status) -> Option<Status> {
832 match value {
833 opentelemetry::trace::Status::Ok => Some(Status {
834 code: Code::Ok as i32,
835 message: "".to_owned(),
836 details: vec![],
837 }),
838 opentelemetry::trace::Status::Unset => None,
839 opentelemetry::trace::Status::Error { description } => Some(Status {
840 code: Code::Unknown as i32,
841 message: description.into(),
842 details: vec![],
843 }),
844 }
845}
846const TRACE_APPEND: &str = "https://www.googleapis.com/auth/trace.append";
847const LOGGING_WRITE: &str = "https://www.googleapis.com/auth/logging.write";
848const MAX_ATTRIBUTES_PER_SPAN: usize = 32;
849
850#[cfg(test)]
851mod tests {
852 use super::*;
853 use opentelemetry::{KeyValue, Value};
854 use opentelemetry_semantic_conventions as semcov;
855
856 #[test]
857 fn test_attributes_mapping() {
858 let capacity = 10;
859 let mut attributes = Vec::with_capacity(capacity);
860
861 attributes.push(KeyValue::new(
863 semconv::attribute::HTTP_HOST,
864 "example.com:8080",
865 ));
866
867 attributes.push(KeyValue::new(semcov::attribute::HTTP_METHOD, "POST"));
869
870 attributes.push(KeyValue::new(HTTP_PATH, "/path/12314/?q=ddds#123"));
872
873 attributes.push(KeyValue::new(
875 semcov::attribute::HTTP_URL,
876 "https://example.com:8080/webshop/articles/4?s=1",
877 ));
878
879 attributes.push(KeyValue::new(
881 semconv::attribute::HTTP_USER_AGENT,
882 "CERN-LineMode/2.15 libwww/2.17b3",
883 ));
884
885 attributes.push(KeyValue::new(semcov::attribute::HTTP_STATUS_CODE, 200i64));
887
888 attributes.push(KeyValue::new(
890 semcov::trace::HTTP_ROUTE,
891 "/webshop/articles/:article_id",
892 ));
893
894 let resources = Resource::builder_empty()
896 .with_attributes([KeyValue::new(
897 semcov::resource::SERVICE_NAME,
898 "Test Service Name",
899 )])
900 .build();
901
902 let actual = Attributes::new(attributes, Some(&resources));
903 assert_eq!(actual.attribute_map.len(), 8);
904 assert_eq!(actual.dropped_attributes_count, 0);
905 assert_eq!(
906 actual.attribute_map.get("/http/host"),
907 Some(&AttributeValue::from(Value::String(
908 "example.com:8080".into()
909 )))
910 );
911 assert_eq!(
912 actual.attribute_map.get("/http/method"),
913 Some(&AttributeValue::from(Value::String("POST".into()))),
914 );
915 assert_eq!(
916 actual.attribute_map.get("/http/path"),
917 Some(&AttributeValue::from(Value::String(
918 "/path/12314/?q=ddds#123".into()
919 ))),
920 );
921 assert_eq!(
922 actual.attribute_map.get("/http/route"),
923 Some(&AttributeValue::from(Value::String(
924 "/webshop/articles/:article_id".into()
925 ))),
926 );
927 assert_eq!(
928 actual.attribute_map.get("/http/url"),
929 Some(&AttributeValue::from(Value::String(
930 "https://example.com:8080/webshop/articles/4?s=1".into(),
931 ))),
932 );
933 assert_eq!(
934 actual.attribute_map.get("/http/user_agent"),
935 Some(&AttributeValue::from(Value::String(
936 "CERN-LineMode/2.15 libwww/2.17b3".into()
937 ))),
938 );
939 assert_eq!(
940 actual.attribute_map.get("/http/status_code"),
941 Some(&AttributeValue::from(Value::I64(200))),
942 );
943 }
944
945 #[test]
946 fn test_too_many() {
947 let resources = Resource::builder_empty()
948 .with_attributes([KeyValue::new(
949 semconv::attribute::USER_AGENT_ORIGINAL,
950 "Test Service Name UA",
951 )])
952 .build();
953 let mut attributes = Vec::with_capacity(32);
954 for i in 0..32 {
955 attributes.push(KeyValue::new(
956 format!("key{}", i),
957 Value::String(format!("value{}", i).into()),
958 ));
959 }
960
961 let actual = Attributes::new(attributes, Some(&resources));
962 assert_eq!(actual.attribute_map.len(), 32);
963 assert_eq!(actual.dropped_attributes_count, 1);
964 assert_eq!(
965 actual.attribute_map.get("/http/user_agent"),
966 Some(&AttributeValue::from(Value::String(
967 "Test Service Name UA".into()
968 ))),
969 );
970 }
971
972 #[test]
973 fn test_attributes_mapping_http_target() {
974 let attributes = vec![KeyValue::new(
975 semcov::attribute::HTTP_TARGET,
976 "/path/12314/?q=ddds#123",
977 )];
978
979 let resources = Resource::builder_empty().with_attributes([]).build();
982 let actual = Attributes::new(attributes, Some(&resources));
983 assert_eq!(actual.attribute_map.len(), 1);
984 assert_eq!(actual.dropped_attributes_count, 0);
985 assert_eq!(
986 actual.attribute_map.get("/http/path"),
987 Some(&AttributeValue::from(Value::String(
988 "/path/12314/?q=ddds#123".into()
989 ))),
990 );
991 }
992
993 #[test]
994 fn test_attributes_mapping_dropped_attributes_count() {
995 let attributes = vec![KeyValue::new("answer", Value::I64(42)),KeyValue::new("long_attribute_key_dvwmacxpeefbuemoxljmqvldjxmvvihoeqnuqdsyovwgljtnemouidabhkmvsnauwfnaihekcfwhugejboiyfthyhmkpsaxtidlsbwsmirebax", Value::String("Some value".into()))];
996
997 let resources = Resource::builder_empty().with_attributes([]).build();
998 let actual = Attributes::new(attributes, Some(&resources));
999 assert_eq!(
1000 actual,
1001 Attributes {
1002 attribute_map: HashMap::from([(
1003 "answer".into(),
1004 AttributeValue::from(Value::I64(42))
1005 ),]),
1006 dropped_attributes_count: 1,
1007 }
1008 );
1009 assert_eq!(actual.attribute_map.len(), 1);
1010 assert_eq!(actual.dropped_attributes_count, 1);
1011 }
1012}