1use crate::parser::{Confidence, FormatParser, STRONG};
23use crate::table::TableBuilder;
24use ax_core::{AxError, Column, Value};
25use serde_json::Value as J;
26use std::collections::BTreeMap;
27
28#[derive(Debug, Default, Clone)]
29pub struct OtlpParser;
30
31fn decode_anyvalue(v: &J) -> Value {
33 let Some(obj) = v.as_object() else {
34 return Value::Null;
35 };
36 if let Some(s) = obj.get("stringValue").and_then(J::as_str) {
37 return Value::Str(s.to_string());
38 }
39 if let Some(iv) = obj.get("intValue") {
40 return match iv {
41 J::String(s) => s
43 .parse::<i64>()
44 .map_or_else(|_| Value::Str(s.clone()), Value::Int),
45 J::Number(n) => n.as_i64().map_or(Value::Null, Value::Int),
46 _ => Value::Null,
47 };
48 }
49 if let Some(dv) = obj.get("doubleValue") {
50 return dv
51 .as_f64()
52 .filter(|f| f.is_finite())
53 .map_or(Value::Null, Value::Float);
54 }
55 if let Some(bv) = obj.get("boolValue") {
56 return bv.as_bool().map_or(Value::Null, Value::Bool);
57 }
58 for key in ["arrayValue", "kvlistValue", "bytesValue"] {
61 if let Some(inner) = obj.get(key) {
62 return Value::Str(inner.to_string());
63 }
64 }
65 Value::Null
66}
67
68fn collect_attributes(attrs: Option<&J>, prefix: &str, row: &mut BTreeMap<String, Value>) {
70 for kv in attrs.and_then(J::as_array).into_iter().flatten() {
71 if let (Some(k), Some(val)) = (kv.get("key").and_then(J::as_str), kv.get("value")) {
72 row.insert(format!("{prefix}{k}"), decode_anyvalue(val));
73 }
74 }
75}
76
77fn unix_nano(v: Option<&J>) -> Option<i64> {
79 match v? {
80 J::String(s) => s.parse().ok(),
81 J::Number(n) => n.as_i64(),
82 _ => None,
83 }
84}
85
86fn insert_str(row: &mut BTreeMap<String, Value>, span: &J, field: &str, column: &str) {
88 if let Some(s) = span.get(field).and_then(J::as_str) {
89 row.insert(column.to_string(), Value::Str(s.to_string()));
90 }
91}
92
93fn add_span_fields(span: &J, row: &mut BTreeMap<String, Value>) {
96 insert_str(row, span, "traceId", "traceId");
97 insert_str(row, span, "spanId", "spanId");
98 insert_str(row, span, "name", "name");
99 if let Some(p) = span.get("parentSpanId").and_then(J::as_str) {
101 if !p.is_empty() {
102 row.insert("parentSpanId".to_string(), Value::Str(p.to_string()));
103 }
104 }
105 if let Some(kind) = span.get("kind").and_then(J::as_i64) {
106 row.insert("kind".to_string(), Value::Int(kind));
107 }
108 let start = unix_nano(span.get("startTimeUnixNano"));
109 let end = unix_nano(span.get("endTimeUnixNano"));
110 if let Some(s) = start {
111 row.insert("startTimeUnixNano".to_string(), Value::Int(s));
112 }
113 if let Some(e) = end {
114 row.insert("endTimeUnixNano".to_string(), Value::Int(e));
115 }
116 if let (Some(s), Some(e)) = (start, end) {
118 if let Some(d) = e.checked_sub(s) {
119 row.insert("durationNanos".to_string(), Value::Int(d));
120 }
121 }
122 if let Some(status) = span.get("status") {
123 if let Some(code) = status.get("code").and_then(J::as_i64) {
124 row.insert("statusCode".to_string(), Value::Int(code));
125 }
126 insert_str(row, status, "message", "statusMessage");
127 }
128}
129
130impl OtlpParser {
131 fn err(&self, msg: impl std::fmt::Display) -> AxError {
132 AxError::Parse {
133 format: self.id().to_string(),
134 message: msg.to_string(),
135 }
136 }
137}
138
139impl FormatParser for OtlpParser {
140 fn id(&self) -> &'static str {
141 "otlp"
142 }
143 fn extensions(&self) -> &'static [&'static str] {
144 &["otlp"]
145 }
146 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
147 let value: J = serde_json::from_slice(bytes).ok()?;
148 value
149 .get("resourceSpans")
150 .is_some_and(J::is_array)
151 .then_some(STRONG)
152 }
153 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
154 let root: J = serde_json::from_slice(bytes).map_err(|e| self.err(e))?;
155 let resource_spans = root
156 .get("resourceSpans")
157 .and_then(J::as_array)
158 .ok_or_else(|| self.err("not OTLP traces: missing 'resourceSpans' array"))?;
159
160 let mut builder = TableBuilder::new();
161 for rs in resource_spans {
162 let mut resource_row: BTreeMap<String, Value> = BTreeMap::new();
163 if let Some(resource) = rs.get("resource") {
164 collect_attributes(resource.get("attributes"), "resource.", &mut resource_row);
165 }
166 for ss in rs
167 .get("scopeSpans")
168 .and_then(J::as_array)
169 .into_iter()
170 .flatten()
171 {
172 let mut scope_row = resource_row.clone();
173 if let Some(scope) = ss.get("scope") {
174 insert_str(&mut scope_row, scope, "name", "scope.name");
175 insert_str(&mut scope_row, scope, "version", "scope.version");
176 }
177 for span in ss.get("spans").and_then(J::as_array).into_iter().flatten() {
178 let mut row = scope_row.clone();
179 collect_attributes(span.get("attributes"), "", &mut row);
180 add_span_fields(span, &mut row);
181 builder.push_row(row);
182 }
183 }
184 }
185 Ok(builder.finish())
186 }
187}
188
189#[cfg(test)]
190mod tests {
191 use super::*;
192 use ax_core::ColType;
193
194 const TRACE: &str = r#"{
195 "resourceSpans": [{
196 "resource": {
197 "attributes": [
198 {"key": "service.name", "value": {"stringValue": "checkout"}}
199 ]
200 },
201 "scopeSpans": [{
202 "scope": {"name": "tracer", "version": "1.2.0"},
203 "spans": [
204 {
205 "traceId": "5b8efff798038103d269b633813fc60c",
206 "spanId": "eee19b7ec3c1b174",
207 "name": "GET /cart",
208 "kind": 2,
209 "startTimeUnixNano": "1544712660000000000",
210 "endTimeUnixNano": "1544712660500000000",
211 "attributes": [
212 {"key": "http.method", "value": {"stringValue": "GET"}},
213 {"key": "http.status_code", "value": {"intValue": "200"}},
214 {"key": "sampling.ratio", "value": {"doubleValue": 0.25}},
215 {"key": "cache.hit", "value": {"boolValue": true}}
216 ],
217 "status": {"code": 1}
218 },
219 {
220 "traceId": "5b8efff798038103d269b633813fc60c",
221 "spanId": "f00d",
222 "parentSpanId": "eee19b7ec3c1b174",
223 "name": "db.query",
224 "kind": 3,
225 "startTimeUnixNano": "1544712660100000000",
226 "endTimeUnixNano": "1544712660900000000",
227 "status": {"code": 2, "message": "timeout"}
228 }
229 ]
230 }]
231 }]
232 }"#;
233
234 fn parse(s: &str) -> Vec<Column> {
235 OtlpParser.parse("-", s.as_bytes()).unwrap()
236 }
237 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
238 cols.iter()
239 .find(|c| c.name == name)
240 .unwrap_or_else(|| panic!("missing column {name}"))
241 }
242
243 #[test]
244 fn one_row_per_span_with_duration() {
245 let cols = parse(TRACE);
246 let dur = col(&cols, "durationNanos");
247 assert_eq!(dur.ty, ColType::Int);
248 assert_eq!(dur.cells.len(), 2, "two spans");
249 assert_eq!(dur.cells[0], Value::Int(500_000_000)); assert_eq!(dur.cells[1], Value::Int(800_000_000)); }
252
253 #[test]
254 fn synthesized_span_fields() {
255 let cols = parse(TRACE);
256 assert_eq!(
257 col(&cols, "startTimeUnixNano").cells[0],
258 Value::Int(1_544_712_660_000_000_000)
259 );
260 assert_eq!(col(&cols, "kind").cells[0], Value::Int(2));
261 assert_eq!(col(&cols, "name").cells[1], Value::Str("db.query".into()));
262 assert_eq!(col(&cols, "statusCode").cells[1], Value::Int(2));
263 assert_eq!(
264 col(&cols, "statusMessage").cells[1],
265 Value::Str("timeout".into())
266 );
267 }
268
269 #[test]
270 fn parent_span_id_absent_on_root_present_on_child() {
271 let cols = parse(TRACE);
272 let parent = col(&cols, "parentSpanId");
273 assert_eq!(parent.cells[0], Value::Null, "root span has no parent");
274 assert_eq!(parent.cells[1], Value::Str("eee19b7ec3c1b174".into()));
275 }
276
277 #[test]
278 fn resource_and_scope_attributes_flatten_onto_every_span() {
279 let cols = parse(TRACE);
280 let svc = col(&cols, "resource.service.name");
281 assert_eq!(svc.cells[0], Value::Str("checkout".into()));
282 assert_eq!(svc.cells[1], Value::Str("checkout".into()), "replicated");
283 assert_eq!(
284 col(&cols, "scope.name").cells[0],
285 Value::Str("tracer".into())
286 );
287 assert_eq!(
288 col(&cols, "scope.version").cells[0],
289 Value::Str("1.2.0".into())
290 );
291 }
292
293 #[test]
294 fn any_value_decoding_per_type() {
295 let cols = parse(TRACE);
296 assert_eq!(col(&cols, "http.method").cells[0], Value::Str("GET".into()));
297 assert_eq!(col(&cols, "http.status_code").cells[0], Value::Int(200)); assert_eq!(col(&cols, "sampling.ratio").cells[0], Value::Float(0.25));
299 assert_eq!(col(&cols, "cache.hit").cells[0], Value::Bool(true));
300 assert_eq!(col(&cols, "http.method").cells[1], Value::Null);
302 }
303
304 #[test]
305 fn decode_anyvalue_units() {
306 assert_eq!(
307 decode_anyvalue(&serde_json::json!({"intValue": 7})),
308 Value::Int(7)
309 ); assert_eq!(
311 decode_anyvalue(&serde_json::json!({"arrayValue": {"values": []}})),
312 Value::Str("{\"values\":[]}".into())
313 );
314 assert_eq!(decode_anyvalue(&serde_json::json!({})), Value::Null);
315 assert_eq!(
316 decode_anyvalue(&serde_json::json!("not an object")),
317 Value::Null
318 );
319 }
320
321 #[test]
322 fn malformed_and_non_otlp_error() {
323 assert!(matches!(
325 OtlpParser.parse("-", b"{not json"),
326 Err(AxError::Parse { .. })
327 ));
328 assert!(matches!(
330 OtlpParser.parse("-", br#"{"foo": 1}"#),
331 Err(AxError::Parse { .. })
332 ));
333 }
334
335 #[test]
336 fn sniff_keys_on_resource_spans() {
337 assert_eq!(OtlpParser.sniff(TRACE.as_bytes()), Some(STRONG));
338 assert_eq!(OtlpParser.sniff(br#"{"resourceSpans": []}"#), Some(STRONG));
339 assert_eq!(OtlpParser.sniff(br#"{"resourceSpans": 1}"#), None);
341 assert_eq!(OtlpParser.sniff(br#"{"resourceLogs": []}"#), None); assert_eq!(OtlpParser.sniff(b"{\"a\":1}"), None);
343 assert_eq!(OtlpParser.sniff(b"a,b,c\n1,2,3"), None); }
345
346 #[test]
347 fn unix_nano_accepts_string_and_number() {
348 assert_eq!(unix_nano(Some(&serde_json::json!("123"))), Some(123)); assert_eq!(unix_nano(Some(&serde_json::json!(456))), Some(456)); assert_eq!(unix_nano(Some(&serde_json::json!(true))), None); assert_eq!(unix_nano(None), None);
352 }
353
354 #[test]
355 fn claims_otlp_extension() {
356 assert_eq!(OtlpParser.extensions(), &["otlp"]);
357 }
358
359 #[test]
360 fn resolves_by_extension_and_content() {
361 let reg = crate::parser::ParserRegistry::default();
362 assert_eq!(
363 reg.resolve("dump.otlp", TRACE.as_bytes()).unwrap().id(),
364 "otlp"
365 );
366 assert_eq!(reg.resolve("-", TRACE.as_bytes()).unwrap().id(), "otlp");
368 }
369}