opentelemetry_proto/transform/
trace.rs

1#[cfg(feature = "gen-tonic-messages")]
2pub mod tonic {
3    use crate::proto::tonic::resource::v1::Resource;
4    use crate::proto::tonic::trace::v1::{span, status, ResourceSpans, ScopeSpans, Span, Status};
5    use crate::transform::common::{
6        to_nanos,
7        tonic::{Attributes, ResourceAttributesWithSchema},
8    };
9    use opentelemetry::trace;
10    use opentelemetry::trace::{Link, SpanId, SpanKind};
11    use opentelemetry_sdk::trace::SpanData;
12    use std::collections::HashMap;
13
14    impl From<SpanKind> for span::SpanKind {
15        fn from(span_kind: SpanKind) -> Self {
16            match span_kind {
17                SpanKind::Client => span::SpanKind::Client,
18                SpanKind::Consumer => span::SpanKind::Consumer,
19                SpanKind::Internal => span::SpanKind::Internal,
20                SpanKind::Producer => span::SpanKind::Producer,
21                SpanKind::Server => span::SpanKind::Server,
22            }
23        }
24    }
25
26    impl From<&trace::Status> for status::StatusCode {
27        fn from(status: &trace::Status) -> Self {
28            match status {
29                trace::Status::Ok => status::StatusCode::Ok,
30                trace::Status::Unset => status::StatusCode::Unset,
31                trace::Status::Error { .. } => status::StatusCode::Error,
32            }
33        }
34    }
35
36    impl From<Link> for span::Link {
37        fn from(link: Link) -> Self {
38            span::Link {
39                trace_id: link.span_context.trace_id().to_bytes().to_vec(),
40                span_id: link.span_context.span_id().to_bytes().to_vec(),
41                trace_state: link.span_context.trace_state().header(),
42                attributes: Attributes::from(link.attributes).0,
43                dropped_attributes_count: link.dropped_attributes_count,
44                flags: link.span_context.trace_flags().to_u8() as u32,
45            }
46        }
47    }
48    impl From<opentelemetry_sdk::trace::SpanData> for Span {
49        fn from(source_span: opentelemetry_sdk::trace::SpanData) -> Self {
50            let span_kind: span::SpanKind = source_span.span_kind.into();
51            Span {
52                trace_id: source_span.span_context.trace_id().to_bytes().to_vec(),
53                span_id: source_span.span_context.span_id().to_bytes().to_vec(),
54                trace_state: source_span.span_context.trace_state().header(),
55                parent_span_id: {
56                    if source_span.parent_span_id != SpanId::INVALID {
57                        source_span.parent_span_id.to_bytes().to_vec()
58                    } else {
59                        vec![]
60                    }
61                },
62                flags: source_span.span_context.trace_flags().to_u8() as u32,
63                name: source_span.name.into_owned(),
64                kind: span_kind as i32,
65                start_time_unix_nano: to_nanos(source_span.start_time),
66                end_time_unix_nano: to_nanos(source_span.end_time),
67                dropped_attributes_count: source_span.dropped_attributes_count,
68                attributes: Attributes::from(source_span.attributes).0,
69                dropped_events_count: source_span.events.dropped_count,
70                events: source_span
71                    .events
72                    .into_iter()
73                    .map(|event| span::Event {
74                        time_unix_nano: to_nanos(event.timestamp),
75                        name: event.name.into(),
76                        attributes: Attributes::from(event.attributes).0,
77                        dropped_attributes_count: event.dropped_attributes_count,
78                    })
79                    .collect(),
80                dropped_links_count: source_span.links.dropped_count,
81                links: source_span.links.into_iter().map(Into::into).collect(),
82                status: Some(Status {
83                    code: status::StatusCode::from(&source_span.status).into(),
84                    message: match source_span.status {
85                        trace::Status::Error { description } => description.to_string(),
86                        _ => Default::default(),
87                    },
88                }),
89            }
90        }
91    }
92
93    impl ResourceSpans {
94        pub fn new(source_span: SpanData, resource: &ResourceAttributesWithSchema) -> Self {
95            let span_kind: span::SpanKind = source_span.span_kind.into();
96            ResourceSpans {
97                resource: Some(Resource {
98                    attributes: resource.attributes.0.clone(),
99                    dropped_attributes_count: 0,
100                    entity_refs: vec![],
101                }),
102                schema_url: resource.schema_url.clone().unwrap_or_default(),
103                scope_spans: vec![ScopeSpans {
104                    schema_url: source_span
105                        .instrumentation_scope
106                        .schema_url()
107                        .map(ToOwned::to_owned)
108                        .unwrap_or_default(),
109                    scope: Some((source_span.instrumentation_scope, None).into()),
110                    spans: vec![Span {
111                        trace_id: source_span.span_context.trace_id().to_bytes().to_vec(),
112                        span_id: source_span.span_context.span_id().to_bytes().to_vec(),
113                        trace_state: source_span.span_context.trace_state().header(),
114                        parent_span_id: {
115                            if source_span.parent_span_id != SpanId::INVALID {
116                                source_span.parent_span_id.to_bytes().to_vec()
117                            } else {
118                                vec![]
119                            }
120                        },
121                        flags: source_span.span_context.trace_flags().to_u8() as u32,
122                        name: source_span.name.into_owned(),
123                        kind: span_kind as i32,
124                        start_time_unix_nano: to_nanos(source_span.start_time),
125                        end_time_unix_nano: to_nanos(source_span.end_time),
126                        dropped_attributes_count: source_span.dropped_attributes_count,
127                        attributes: Attributes::from(source_span.attributes).0,
128                        dropped_events_count: source_span.events.dropped_count,
129                        events: source_span
130                            .events
131                            .into_iter()
132                            .map(|event| span::Event {
133                                time_unix_nano: to_nanos(event.timestamp),
134                                name: event.name.into(),
135                                attributes: Attributes::from(event.attributes).0,
136                                dropped_attributes_count: event.dropped_attributes_count,
137                            })
138                            .collect(),
139                        dropped_links_count: source_span.links.dropped_count,
140                        links: source_span.links.into_iter().map(Into::into).collect(),
141                        status: Some(Status {
142                            code: status::StatusCode::from(&source_span.status).into(),
143                            message: match source_span.status {
144                                trace::Status::Error { description } => description.to_string(),
145                                _ => Default::default(),
146                            },
147                        }),
148                    }],
149                }],
150            }
151        }
152    }
153
154    pub fn group_spans_by_resource_and_scope(
155        spans: Vec<SpanData>,
156        resource: &ResourceAttributesWithSchema,
157    ) -> Vec<ResourceSpans> {
158        // Group spans by their instrumentation scope
159        let scope_map = spans.iter().fold(
160            HashMap::new(),
161            |mut scope_map: HashMap<&opentelemetry::InstrumentationScope, Vec<&SpanData>>, span| {
162                let instrumentation = &span.instrumentation_scope;
163                scope_map.entry(instrumentation).or_default().push(span);
164                scope_map
165            },
166        );
167
168        // Convert the grouped spans into ScopeSpans
169        let scope_spans = scope_map
170            .into_iter()
171            .map(|(instrumentation, span_records)| ScopeSpans {
172                scope: Some((instrumentation, None).into()),
173                schema_url: resource.schema_url.clone().unwrap_or_default(),
174                spans: span_records
175                    .into_iter()
176                    .map(|span_data| span_data.clone().into())
177                    .collect(),
178            })
179            .collect();
180
181        // Wrap ScopeSpans into a single ResourceSpans
182        vec![ResourceSpans {
183            resource: Some(Resource {
184                attributes: resource.attributes.0.clone(),
185                dropped_attributes_count: 0,
186                entity_refs: vec![],
187            }),
188            scope_spans,
189            schema_url: resource.schema_url.clone().unwrap_or_default(),
190        }]
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use crate::tonic::common::v1::any_value::Value;
197    use crate::transform::common::tonic::ResourceAttributesWithSchema;
198    use opentelemetry::time::now;
199    use opentelemetry::trace::{
200        SpanContext, SpanId, SpanKind, Status, TraceFlags, TraceId, TraceState,
201    };
202    use opentelemetry::InstrumentationScope;
203    use opentelemetry::KeyValue;
204    use opentelemetry_sdk::resource::Resource;
205    use opentelemetry_sdk::trace::SpanData;
206    use opentelemetry_sdk::trace::{SpanEvents, SpanLinks};
207    use std::borrow::Cow;
208    use std::time::Duration;
209
210    fn create_test_span_data(instrumentation_name: &'static str) -> SpanData {
211        let span_context = SpanContext::new(
212            TraceId::from_u128(123),
213            SpanId::from_u64(456),
214            TraceFlags::default(),
215            false,
216            TraceState::default(),
217        );
218
219        SpanData {
220            span_context,
221            parent_span_id: SpanId::from_u64(0),
222            span_kind: SpanKind::Internal,
223            name: Cow::Borrowed("test_span"),
224            start_time: now(),
225            end_time: now() + Duration::from_secs(1),
226            attributes: vec![KeyValue::new("key", "value")],
227            dropped_attributes_count: 0,
228            events: SpanEvents::default(),
229            links: SpanLinks::default(),
230            status: Status::Unset,
231            instrumentation_scope: InstrumentationScope::builder(instrumentation_name).build(),
232        }
233    }
234
235    #[test]
236    fn test_group_spans_by_resource_and_scope_single_scope() {
237        let resource = Resource::builder_empty()
238            .with_attribute(KeyValue::new("resource_key", "resource_value"))
239            .build();
240        let span_data = create_test_span_data("lib1");
241
242        let spans = vec![span_data.clone()];
243        let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
244
245        let grouped_spans =
246            crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource);
247
248        assert_eq!(grouped_spans.len(), 1);
249
250        let resource_spans = &grouped_spans[0];
251        assert_eq!(
252            resource_spans.resource.as_ref().unwrap().attributes.len(),
253            1
254        );
255        assert_eq!(
256            resource_spans.resource.as_ref().unwrap().attributes[0].key,
257            "resource_key"
258        );
259        assert_eq!(
260            resource_spans.resource.as_ref().unwrap().attributes[0]
261                .value
262                .clone()
263                .unwrap()
264                .value
265                .unwrap(),
266            Value::StringValue("resource_value".to_string())
267        );
268
269        let scope_spans = &resource_spans.scope_spans;
270        assert_eq!(scope_spans.len(), 1);
271
272        let scope_span = &scope_spans[0];
273        assert_eq!(scope_span.scope.as_ref().unwrap().name, "lib1");
274        assert_eq!(scope_span.spans.len(), 1);
275
276        assert_eq!(
277            scope_span.spans[0].trace_id,
278            span_data.span_context.trace_id().to_bytes().to_vec()
279        );
280    }
281
282    #[test]
283    fn test_group_spans_by_resource_and_scope_multiple_scopes() {
284        let resource = Resource::builder_empty()
285            .with_attribute(KeyValue::new("resource_key", "resource_value"))
286            .build();
287        let span_data1 = create_test_span_data("lib1");
288        let span_data2 = create_test_span_data("lib1");
289        let span_data3 = create_test_span_data("lib2");
290
291        let spans = vec![span_data1.clone(), span_data2.clone(), span_data3.clone()];
292        let resource: ResourceAttributesWithSchema = (&resource).into(); // Convert Resource to ResourceAttributesWithSchema
293
294        let grouped_spans =
295            crate::transform::trace::tonic::group_spans_by_resource_and_scope(spans, &resource);
296
297        assert_eq!(grouped_spans.len(), 1);
298
299        let resource_spans = &grouped_spans[0];
300        assert_eq!(
301            resource_spans.resource.as_ref().unwrap().attributes.len(),
302            1
303        );
304        assert_eq!(
305            resource_spans.resource.as_ref().unwrap().attributes[0].key,
306            "resource_key"
307        );
308        assert_eq!(
309            resource_spans.resource.as_ref().unwrap().attributes[0]
310                .value
311                .clone()
312                .unwrap()
313                .value
314                .unwrap(),
315            Value::StringValue("resource_value".to_string())
316        );
317
318        let scope_spans = &resource_spans.scope_spans;
319        assert_eq!(scope_spans.len(), 2);
320
321        // Check the scope spans for both lib1 and lib2
322        let mut lib1_scope_span = None;
323        let mut lib2_scope_span = None;
324
325        for scope_span in scope_spans {
326            match scope_span.scope.as_ref().unwrap().name.as_str() {
327                "lib1" => lib1_scope_span = Some(scope_span),
328                "lib2" => lib2_scope_span = Some(scope_span),
329                _ => {}
330            }
331        }
332
333        let lib1_scope_span = lib1_scope_span.expect("lib1 scope span not found");
334        let lib2_scope_span = lib2_scope_span.expect("lib2 scope span not found");
335
336        assert_eq!(lib1_scope_span.scope.as_ref().unwrap().name, "lib1");
337        assert_eq!(lib2_scope_span.scope.as_ref().unwrap().name, "lib2");
338
339        assert_eq!(lib1_scope_span.spans.len(), 2);
340        assert_eq!(lib2_scope_span.spans.len(), 1);
341
342        assert_eq!(
343            lib1_scope_span.spans[0].trace_id,
344            span_data1.span_context.trace_id().to_bytes().to_vec()
345        );
346        assert_eq!(
347            lib1_scope_span.spans[1].trace_id,
348            span_data2.span_context.trace_id().to_bytes().to_vec()
349        );
350        assert_eq!(
351            lib2_scope_span.spans[0].trace_id,
352            span_data3.span_context.trace_id().to_bytes().to_vec()
353        );
354    }
355}