opentelemetry_proto/transform/
trace.rs1#[cfg(feature = "gen-tonic-messages")]
2pub mod tonic {
3 use crate::proto::tonic::resource::v1::Resource;
4 use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status};
5 use crate::transform::common::{
6 to_nanos,
7 tonic::{Attributes, ResourceAttributesWithSchema},
8 };
9 use opentelemetry::trace;
10 use opentelemetry::trace::{Link, SpanId, SpanKind};
11 use opentelemetry_sdk::trace::SpanData;
12 use std::collections::HashMap;
13
14 impl From<SpanKind> for span::SpanKind {
15 fn from(span_kind: SpanKind) -> Self {
16 match span_kind {
17 SpanKind::Client => span::SpanKind::Client,
18 SpanKind::Consumer => span::SpanKind::Consumer,
19 SpanKind::Internal => span::SpanKind::Internal,
20 SpanKind::Producer => span::SpanKind::Producer,
21 SpanKind::Server => span::SpanKind::Server,
22 }
23 }
24 }
25
26 impl From<&trace::Status> for status::StatusCode {
27 fn from(status: &trace::Status) -> Self {
28 match status {
29 trace::Status::Ok => status::StatusCode::Ok,
30 trace::Status::Unset => status::StatusCode::Unset,
31 trace::Status::Error { .. } => status::StatusCode::Error,
32 }
33 }
34 }
35
36 impl From<Link> for span::Link {
37 fn from(link: Link) -> Self {
38 span::Link {
39 trace_id: link.span_context.trace_id().to_bytes().to_vec(),
40 span_id: link.span_context.span_id().to_bytes().to_vec(),
41 trace_state: link.span_context.trace_state().header(),
42 attributes: Attributes::from(link.attributes).0,
43 dropped_attributes_count: link.dropped_attributes_count,
44 flags: link.span_context.trace_flags().to_u8() as u32,
45 }
46 }
47 }
48 impl From<opentelemetry_sdk::trace::SpanData> for Span {
49 fn from(source_span: opentelemetry_sdk::trace::SpanData) -> Self {
50 let span_kind: span::SpanKind = source_span.span_kind.into();
51 Span {
52 trace_id: source_span.span_context.trace_id().to_bytes().to_vec(),
53 span_id: source_span.span_context.span_id().to_bytes().to_vec(),
54 trace_state: source_span.span_context.trace_state().header(),
55 parent_span_id: {
56 if source_span.parent_span_id != SpanId::INVALID {
57 source_span.parent_span_id.to_bytes().to_vec()
58 } else {
59 vec![]
60 }
61 },
62 flags: source_span.span_context.trace_flags().to_u8() as u32,
63 name: source_span.name.into_owned(),
64 kind: span_kind as i32,
65 start_time_unix_nano: to_nanos(source_span.start_time),
66 end_time_unix_nano: to_nanos(source_span.end_time),
67 dropped_attributes_count: source_span.dropped_attributes_count,
68 attributes: Attributes::from(source_span.attributes).0,
69 dropped_events_count: source_span.events.dropped_count,
70 events: source_span
71 .events
72 .into_iter()
73 .map(|event| span::Event {
74 time_unix_nano: to_nanos(event.timestamp),
75 name: event.name.into(),
76 attributes: Attributes::from(event.attributes).0,
77 dropped_attributes_count: event.dropped_attributes_count,
78 })
79 .collect(),
80 dropped_links_count: source_span.links.dropped_count,
81 links: source_span.links.into_iter().map(Into::into).collect(),
82 status: Some(Status {
83 code: status::StatusCode::from(&source_span.status).into(),
84 message: match source_span.status {
85 trace::Status::Error { description } => description.to_string(),
86 _ => Default::default(),
87 },
88 }),
89 }
90 }
91 }
92
93 impl ResourceSpans {
94 pub fn new(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> Self {
95 let span_kind: span::SpanKind = source_span.span_kind.into();
96 ResourceSpans {
97 resource: Some(Resource {
98 attributes: resource.attributes.0.clone(),
99 dropped_attributes_count: 0,
100 entity_refs: vec![],
101 }),
102 schema_url: resource.schema_url.clone().unwrap_or_default(),
103 scope_spans: vec![ScopeSpans {
104 schema_url: source_span
105 .instrumentation_scope
106 .schema_url()
107 .map(ToOwned::to_owned)
108 .unwrap_or_default(),
109 scope: Some((source_span.instrumentation_scope, None).into()),
110 spans: vec![Span {
111 trace_id: source_span.span_context.trace_id().to_bytes().to_vec(),
112 span_id: source_span.span_context.span_id().to_bytes().to_vec(),
113 trace_state: source_span.span_context.trace_state().header(),
114 parent_span_id: {
115 if source_span.parent_span_id != SpanId::INVALID {
116 source_span.parent_span_id.to_bytes().to_vec()
117 } else {
118 vec![]
119 }
120 },
121 flags: source_span.span_context.trace_flags().to_u8() as u32,
122 name: source_span.name.into_owned(),
123 kind: span_kind as i32,
124 start_time_unix_nano: to_nanos(source_span.start_time),
125 end_time_unix_nano: to_nanos(source_span.end_time),
126 dropped_attributes_count: source_span.dropped_attributes_count,
127 attributes: Attributes::from(source_span.attributes).0,
128 dropped_events_count: source_span.events.dropped_count,
129 events: source_span
130 .events
131 .into_iter()
132 .map(|event| span::Event {
133 time_unix_nano: to_nanos(event.timestamp),
134 name: event.name.into(),
135 attributes: Attributes::from(event.attributes).0,
136 dropped_attributes_count: event.dropped_attributes_count,
137 })
138 .collect(),
139 dropped_links_count: source_span.links.dropped_count,
140 links: source_span.links.into_iter().map(Into::into).collect(),
141 status: Some(Status {
142 code: status::StatusCode::from(&source_span.status).into(),
143 message: match source_span.status {
144 trace::Status::Error { description } => description.to_string(),
145 _ => Default::default(),
146 },
147 }),
148 }],
149 }],
150 }
151 }
152 }
153
154 pub fn group_spans_by_resource_and_scope(
155 spans: Vec<SpanData>,
156 resource: &ResourceAttributesWithSchema,
157 ) -> Vec<ResourceSpans> {
158 let scope_map = spans.iter().fold(
160 HashMap::new(),
161 |mut scope_map: HashMap<&opentelemetry::InstrumentationScope, Vec<&SpanData>>, span| {
162 let instrumentation = &span.instrumentation_scope;
163 scope_map.entry(instrumentation).or_default().push(span);
164 scope_map
165 },
166 );
167
168 let scope_spans = scope_map
170 .into_iter()
171 .map(|(instrumentation, span_records)| ScopeSpans {
172 scope: Some((instrumentation, None).into()),
173 schema_url: resource.schema_url.clone().unwrap_or_default(),
174 spans: span_records
175 .into_iter()
176 .map(|span_data| span_data.clone().into())
177 .collect(),
178 })
179 .collect();
180
181 vec![ResourceSpans {
183 resource: Some(Resource {
184 attributes: resource.attributes.0.clone(),
185 dropped_attributes_count: 0,
186 entity_refs: vec![],
187 }),
188 scope_spans,
189 schema_url: resource.schema_url.clone().unwrap_or_default(),
190 }]
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use crate::tonic::common::v1::any_value::Value;
197 use crate::transform::common::tonic::ResourceAttributesWithSchema;
198 use opentelemetry::time::now;
199 use opentelemetry::trace::{
200 SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
201 };
202 use opentelemetry::InstrumentationScope;
203 use opentelemetry::KeyValue;
204 use opentelemetry_sdk::resource::Resource;
205 use opentelemetry_sdk::trace::SpanData;
206 use opentelemetry_sdk::trace::{SpanEvents, SpanLinks};
207 use std::borrow::Cow;
208 use std::time::Duration;
209
210 fn create_test_span_data(instrumentation_name: &'static str) -> SpanData {
211 let span_context = SpanContext::new(
212 TraceId::from_u128(123),
213 SpanId::from_u64(456),
214 TraceFlags::default(),
215 false,
216 TraceState::default(),
217 );
218
219 SpanData {
220 span_context,
221 parent_span_id: SpanId::from_u64(0),
222 span_kind: SpanKind::Internal,
223 name: Cow::Borrowed("test_span"),
224 start_time: now(),
225 end_time: now() + Duration::from_secs(1),
226 attributes: vec![KeyValue::new("key", "value")],
227 dropped_attributes_count: 0,
228 events: SpanEvents::default(),
229 links: SpanLinks::default(),
230 status: Status::Unset,
231 instrumentation_scope: InstrumentationScope::builder(instrumentation_name).build(),
232 }
233 }
234
235 #[test]
236 fn test_group_spans_by_resource_and_scope_single_scope() {
237 let resource = Resource::builder_empty()
238 .with_attribute(KeyValue::new("resource_key", "resource_value"))
239 .build();
240 let span_data = create_test_span_data("lib1");
241
242 let spans = vec![span_data.clone()];
243 let resource: ResourceAttributesWithSchema = (&resource).into(); let grouped_spans =
246 crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource);
247
248 assert_eq!(grouped_spans.len(), 1);
249
250 let resource_spans = &grouped_spans[0];
251 assert_eq!(
252 resource_spans.resource.as_ref().unwrap().attributes.len(),
253 1
254 );
255 assert_eq!(
256 resource_spans.resource.as_ref().unwrap().attributes[0].key,
257 "resource_key"
258 );
259 assert_eq!(
260 resource_spans.resource.as_ref().unwrap().attributes[0]
261 .value
262 .clone()
263 .unwrap()
264 .value
265 .unwrap(),
266 Value::StringValue("resource_value".to_string())
267 );
268
269 let scope_spans = &resource_spans.scope_spans;
270 assert_eq!(scope_spans.len(), 1);
271
272 let scope_span = &scope_spans[0];
273 assert_eq!(scope_span.scope.as_ref().unwrap().name, "lib1");
274 assert_eq!(scope_span.spans.len(), 1);
275
276 assert_eq!(
277 scope_span.spans[0].trace_id,
278 span_data.span_context.trace_id().to_bytes().to_vec()
279 );
280 }
281
282 #[test]
283 fn test_group_spans_by_resource_and_scope_multiple_scopes() {
284 let resource = Resource::builder_empty()
285 .with_attribute(KeyValue::new("resource_key", "resource_value"))
286 .build();
287 let span_data1 = create_test_span_data("lib1");
288 let span_data2 = create_test_span_data("lib1");
289 let span_data3 = create_test_span_data("lib2");
290
291 let spans = vec![span_data1.clone(), span_data2.clone(), span_data3.clone()];
292 let resource: ResourceAttributesWithSchema = (&resource).into(); let grouped_spans =
295 crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource);
296
297 assert_eq!(grouped_spans.len(), 1);
298
299 let resource_spans = &grouped_spans[0];
300 assert_eq!(
301 resource_spans.resource.as_ref().unwrap().attributes.len(),
302 1
303 );
304 assert_eq!(
305 resource_spans.resource.as_ref().unwrap().attributes[0].key,
306 "resource_key"
307 );
308 assert_eq!(
309 resource_spans.resource.as_ref().unwrap().attributes[0]
310 .value
311 .clone()
312 .unwrap()
313 .value
314 .unwrap(),
315 Value::StringValue("resource_value".to_string())
316 );
317
318 let scope_spans = &resource_spans.scope_spans;
319 assert_eq!(scope_spans.len(), 2);
320
321 let mut lib1_scope_span = None;
323 let mut lib2_scope_span = None;
324
325 for scope_span in scope_spans {
326 match scope_span.scope.as_ref().unwrap().name.as_str() {
327 "lib1" => lib1_scope_span = Some(scope_span),
328 "lib2" => lib2_scope_span = Some(scope_span),
329 _ => {}
330 }
331 }
332
333 let lib1_scope_span = lib1_scope_span.expect("lib1 scope span not found");
334 let lib2_scope_span = lib2_scope_span.expect("lib2 scope span not found");
335
336 assert_eq!(lib1_scope_span.scope.as_ref().unwrap().name, "lib1");
337 assert_eq!(lib2_scope_span.scope.as_ref().unwrap().name, "lib2");
338
339 assert_eq!(lib1_scope_span.spans.len(), 2);
340 assert_eq!(lib2_scope_span.spans.len(), 1);
341
342 assert_eq!(
343 lib1_scope_span.spans[0].trace_id,
344 span_data1.span_context.trace_id().to_bytes().to_vec()
345 );
346 assert_eq!(
347 lib1_scope_span.spans[1].trace_id,
348 span_data2.span_context.trace_id().to_bytes().to_vec()
349 );
350 assert_eq!(
351 lib2_scope_span.spans[0].trace_id,
352 span_data3.span_context.trace_id().to_bytes().to_vec()
353 );
354 }
355}