Skip to main content

otlp_embedded/
trace.rs

1use std::{
2    collections::{HashMap, hash_map::Entry},
3    sync::Arc,
4    time::{Duration, SystemTime},
5};
6
7use crate::proto::{
8    common::v1::{AnyValue, KeyValue, any_value},
9    resource::v1::Resource,
10    trace::v1::Span,
11};
12use get_size2::GetSize;
13use itertools::Itertools;
14use serde_json::json;
15
16use crate::jaeger::model::{JaegerProcess, span_to_jaeger_json};
17
18pub(crate) type TraceId = Vec<u8>;
19pub(crate) type SpanId = Vec<u8>;
20
21#[derive(Debug, Clone, GetSize)]
22pub(crate) struct SpanValue {
23    pub span: Box<Span>,
24    pub resource: Arc<Resource>,
25}
26
27fn extract_string<'a>(attr: &'a [KeyValue], key: &'static str) -> &'a str {
28    attr.iter()
29        .find(|a| a.key == key)
30        .and_then(|kv| {
31            if let Some(AnyValue {
32                value: Some(any_value::Value::StringValue(str)),
33            }) = &kv.value
34            {
35                Some(str.as_str())
36            } else {
37                None
38            }
39        })
40        .unwrap_or("unknown")
41}
42
43impl SpanValue {
44    pub fn service_name(&self) -> &str {
45        extract_string(&self.resource.attributes, "service.name")
46    }
47
48    pub fn service_instance_id(&self) -> &str {
49        extract_string(&self.resource.attributes, "service.instance.id")
50    }
51
52    pub fn operation(&self) -> &str {
53        self.span.name.as_str()
54    }
55}
56
57#[derive(Debug, Clone, GetSize)]
58pub(crate) enum SpanNode {
59    Placeholder,
60    Value(SpanValue),
61}
62
63/// A trace that consists of multiple spans in a tree structure.
64// TODO: should we cache the size?
65#[derive(Debug, Clone, GetSize)]
66pub struct Trace {
67    pub(crate) spans: HashMap<SpanId, SpanNode>,
68    pub(crate) end_time: SystemTime,
69}
70
71impl Default for Trace {
72    fn default() -> Self {
73        Self {
74            spans: Default::default(),
75            end_time: SystemTime::UNIX_EPOCH,
76        }
77    }
78}
79
80impl Trace {
81    pub(crate) fn add_value(&mut self, mut value: SpanValue) {
82        let span_id = &value.span.span_id;
83        let parent_id = &value.span.parent_span_id;
84
85        if span_id.is_empty() {
86            return;
87        }
88
89        // If there's a parent and not recorded yet, add a placeholder.
90        if !parent_id.is_empty() {
91            self.spans
92                .entry(parent_id.clone())
93                .or_insert(SpanNode::Placeholder);
94        }
95
96        // Add a `message` attribute from the event name. Otherwise, it won't be displayed in Tempo.
97        for event in &mut value.span.events {
98            const MESSAGE: &str = "message";
99
100            event.attributes.push(KeyValue {
101                key: MESSAGE.to_string(),
102                value: Some(AnyValue {
103                    value: Some(any_value::Value::StringValue(event.name.clone())),
104                }),
105            });
106        }
107
108        self.end_time = (self.end_time)
109            .max(SystemTime::UNIX_EPOCH + Duration::from_nanos(value.span.end_time_unix_nano as _));
110
111        match self.spans.entry(span_id.clone()) {
112            Entry::Occupied(o) => {
113                let o = o.into_mut();
114                match o {
115                    SpanNode::Placeholder => *o = SpanNode::Value(value),
116                    SpanNode::Value(o) => {
117                        // Update the span with the new value.
118                        o.span.attributes.extend(value.span.attributes);
119                        o.span.events.extend(value.span.events);
120                        o.span.start_time_unix_nano =
121                            (o.span.start_time_unix_nano).min(value.span.start_time_unix_nano);
122                        o.span.end_time_unix_nano =
123                            (o.span.end_time_unix_nano).max(value.span.end_time_unix_nano);
124                    }
125                }
126            }
127            Entry::Vacant(v) => {
128                v.insert(SpanNode::Value(value));
129            }
130        }
131    }
132
133    fn iter_valid(&self) -> impl Iterator<Item = &SpanValue> {
134        self.spans.values().filter_map(|node| match node {
135            SpanNode::Placeholder => None,
136            SpanNode::Value(value) => Some(value),
137        })
138    }
139
140    /// Check if the trace is complete.
141    pub fn is_complete(&self) -> bool {
142        // Since all new non-root values recorded will add a placeholder for the parent.
143        // If there's no placeholder, it means the trace is complete.
144        self.spans.values().all(|v| matches!(v, SpanNode::Value(_)))
145    }
146
147    /// Get the trace ID.
148    pub fn id(&self) -> &[u8] {
149        &self.iter_valid().next().unwrap().span.trace_id
150    }
151
152    /// Get the trace ID as a hex string.
153    pub fn hex_id(&self) -> String {
154        hex::encode(self.id())
155    }
156
157    /// Convert the trace into a JSON value that can be directly imported into Grafana Tempo
158    /// as a batch.
159    pub fn to_tempo_batch(&self) -> serde_json::Value {
160        let entries = self
161            .iter_valid()
162            .map(|v| {
163                json!({
164                    "resource": &*v.resource,
165                    "instrumentationLibrarySpans": [{
166                        "spans": [v.span]
167                    }]
168                })
169            })
170            .collect_vec();
171
172        json!({
173            "batches": entries
174        })
175    }
176
177    /// Convert the trace into a JSON value that can be directly imported into Jaeger
178    /// as a batch.
179    pub fn to_jaeger_batch(&self) -> serde_json::Value {
180        json!({
181            "data": [
182                self.to_jaeger()
183            ]
184        })
185    }
186
187    pub(crate) fn to_jaeger(&self) -> serde_json::Value {
188        let mut processes = HashMap::new();
189
190        let entries = self
191            .iter_valid()
192            .map(|v| {
193                let process = JaegerProcess::from(v);
194                let key = process.key.clone();
195                processes.insert(key.clone(), process);
196
197                span_to_jaeger_json(*v.span.clone(), key)
198            })
199            .collect_vec();
200
201        if entries.is_empty() {
202            return json!({});
203        }
204
205        let trace_id = &entries[0]["traceID"];
206
207        json!({
208            "traceID": trace_id,
209            "spans": entries,
210            "processes": processes,
211        })
212    }
213}
214
215impl Trace {
216    pub(crate) fn root_span(&self) -> Option<&SpanValue> {
217        self.iter_valid().find(|v| v.span.parent_span_id.is_empty())
218    }
219
220    /// Get the service name of the root span in this trace.
221    ///
222    /// Returns `None` if the trace is not complete and the root span is not received.
223    pub fn service_name(&self) -> Option<&str> {
224        self.root_span().map(|v| v.service_name())
225    }
226
227    /// Get the service instance ID of the root span in this trace.
228    ///
229    /// Returns `None` if the trace is not complete and the root span is not received.
230    pub fn service_instance_id(&self) -> Option<&str> {
231        self.root_span().map(|v| v.service_instance_id())
232    }
233
234    /// Get the operation (span name) of the root span in this trace.
235    ///
236    /// Returns `None` if the trace is not complete and the root span is not received.
237    pub fn operation(&self) -> Option<&str> {
238        self.root_span().map(|v| v.operation())
239    }
240}