1use std::{
2 collections::{HashMap, hash_map::Entry},
3 sync::Arc,
4 time::{Duration, SystemTime},
5};
6
7use crate::proto::{
8 common::v1::{AnyValue, KeyValue, any_value},
9 resource::v1::Resource,
10 trace::v1::Span,
11};
12use get_size2::GetSize;
13use itertools::Itertools;
14use serde_json::json;
15
16use crate::jaeger::model::{JaegerProcess, span_to_jaeger_json};
17
18pub(crate) type TraceId = Vec<u8>;
19pub(crate) type SpanId = Vec<u8>;
20
21#[derive(Debug, Clone, GetSize)]
22pub(crate) struct SpanValue {
23 pub span: Box<Span>,
24 pub resource: Arc<Resource>,
25}
26
27fn extract_string<'a>(attr: &'a [KeyValue], key: &'static str) -> &'a str {
28 attr.iter()
29 .find(|a| a.key == key)
30 .and_then(|kv| {
31 if let Some(AnyValue {
32 value: Some(any_value::Value::StringValue(str)),
33 }) = &kv.value
34 {
35 Some(str.as_str())
36 } else {
37 None
38 }
39 })
40 .unwrap_or("unknown")
41}
42
43impl SpanValue {
44 pub fn service_name(&self) -> &str {
45 extract_string(&self.resource.attributes, "service.name")
46 }
47
48 pub fn service_instance_id(&self) -> &str {
49 extract_string(&self.resource.attributes, "service.instance.id")
50 }
51
52 pub fn operation(&self) -> &str {
53 self.span.name.as_str()
54 }
55}
56
57#[derive(Debug, Clone, GetSize)]
58pub(crate) enum SpanNode {
59 Placeholder,
60 Value(SpanValue),
61}
62
63#[derive(Debug, Clone, GetSize)]
66pub struct Trace {
67 pub(crate) spans: HashMap<SpanId, SpanNode>,
68 pub(crate) end_time: SystemTime,
69}
70
71impl Default for Trace {
72 fn default() -> Self {
73 Self {
74 spans: Default::default(),
75 end_time: SystemTime::UNIX_EPOCH,
76 }
77 }
78}
79
80impl Trace {
81 pub(crate) fn add_value(&mut self, mut value: SpanValue) {
82 let span_id = &value.span.span_id;
83 let parent_id = &value.span.parent_span_id;
84
85 if span_id.is_empty() {
86 return;
87 }
88
89 if !parent_id.is_empty() {
91 self.spans
92 .entry(parent_id.clone())
93 .or_insert(SpanNode::Placeholder);
94 }
95
96 for event in &mut value.span.events {
98 const MESSAGE: &str = "message";
99
100 event.attributes.push(KeyValue {
101 key: MESSAGE.to_string(),
102 value: Some(AnyValue {
103 value: Some(any_value::Value::StringValue(event.name.clone())),
104 }),
105 });
106 }
107
108 self.end_time = (self.end_time)
109 .max(SystemTime::UNIX_EPOCH + Duration::from_nanos(value.span.end_time_unix_nano as _));
110
111 match self.spans.entry(span_id.clone()) {
112 Entry::Occupied(o) => {
113 let o = o.into_mut();
114 match o {
115 SpanNode::Placeholder => *o = SpanNode::Value(value),
116 SpanNode::Value(o) => {
117 o.span.attributes.extend(value.span.attributes);
119 o.span.events.extend(value.span.events);
120 o.span.start_time_unix_nano =
121 (o.span.start_time_unix_nano).min(value.span.start_time_unix_nano);
122 o.span.end_time_unix_nano =
123 (o.span.end_time_unix_nano).max(value.span.end_time_unix_nano);
124 }
125 }
126 }
127 Entry::Vacant(v) => {
128 v.insert(SpanNode::Value(value));
129 }
130 }
131 }
132
133 fn iter_valid(&self) -> impl Iterator<Item = &SpanValue> {
134 self.spans.values().filter_map(|node| match node {
135 SpanNode::Placeholder => None,
136 SpanNode::Value(value) => Some(value),
137 })
138 }
139
140 pub fn is_complete(&self) -> bool {
142 self.spans.values().all(|v| matches!(v, SpanNode::Value(_)))
145 }
146
147 pub fn id(&self) -> &[u8] {
149 &self.iter_valid().next().unwrap().span.trace_id
150 }
151
152 pub fn hex_id(&self) -> String {
154 hex::encode(self.id())
155 }
156
157 pub fn to_tempo_batch(&self) -> serde_json::Value {
160 let entries = self
161 .iter_valid()
162 .map(|v| {
163 json!({
164 "resource": &*v.resource,
165 "instrumentationLibrarySpans": [{
166 "spans": [v.span]
167 }]
168 })
169 })
170 .collect_vec();
171
172 json!({
173 "batches": entries
174 })
175 }
176
177 pub fn to_jaeger_batch(&self) -> serde_json::Value {
180 json!({
181 "data": [
182 self.to_jaeger()
183 ]
184 })
185 }
186
187 pub(crate) fn to_jaeger(&self) -> serde_json::Value {
188 let mut processes = HashMap::new();
189
190 let entries = self
191 .iter_valid()
192 .map(|v| {
193 let process = JaegerProcess::from(v);
194 let key = process.key.clone();
195 processes.insert(key.clone(), process);
196
197 span_to_jaeger_json(*v.span.clone(), key)
198 })
199 .collect_vec();
200
201 if entries.is_empty() {
202 return json!({});
203 }
204
205 let trace_id = &entries[0]["traceID"];
206
207 json!({
208 "traceID": trace_id,
209 "spans": entries,
210 "processes": processes,
211 })
212 }
213}
214
215impl Trace {
216 pub(crate) fn root_span(&self) -> Option<&SpanValue> {
217 self.iter_valid().find(|v| v.span.parent_span_id.is_empty())
218 }
219
220 pub fn service_name(&self) -> Option<&str> {
224 self.root_span().map(|v| v.service_name())
225 }
226
227 pub fn service_instance_id(&self) -> Option<&str> {
231 self.root_span().map(|v| v.service_instance_id())
232 }
233
234 pub fn operation(&self) -> Option<&str> {
238 self.root_span().map(|v| v.operation())
239 }
240}