Skip to main content

anomalyx_normalize/parsers/
otlp.rs

1//! OpenTelemetry OTLP/JSON trace parser — the emerging default telemetry wire.
2//!
3//! An OTLP `ExportTraceServiceRequest` nests
4//! `resourceSpans[].scopeSpans[].spans[]`. We flatten it to **one row per span**
5//! and synthesize the columns the detectors want:
6//!
7//! - `durationNanos` = `endTimeUnixNano - startTimeUnixNano` → span-duration
8//!   `point` outliers;
9//! - `statusCode` → error-rate `dist` drift across a baseline;
10//! - `startTimeUnixNano` → `--cadence` on emit intervals.
11//!
12//! Resource attributes (`resource.<key>`), scope (`scope.name`/`scope.version`),
13//! and span attributes (the bare `<key>`) are flattened too, decoding the OTLP
14//! `AnyValue` wrapper (`stringValue`/`intValue`/`doubleValue`/`boolValue`; arrays
15//! and kvlists are kept as their canonical JSON string). OTLP/JSON encodes 64-bit
16//! ints (times, `intValue`) as strings — we parse them back to `Int`.
17//!
18//! Claims `.otlp`; otherwise detected by the unmistakable top-level
19//! `resourceSpans` array. (A `.json`-named dump routes to the generic JSON parser
20//! by extension; pipe it on stdin to get OTLP-aware parsing.)
21
22use crate::parser::{Confidence, FormatParser, STRONG};
23use crate::table::TableBuilder;
24use ax_core::{AxError, Column, Value};
25use serde_json::Value as J;
26use std::collections::BTreeMap;
27
28#[derive(Debug, Default, Clone)]
29pub struct OtlpParser;
30
31/// Decodes an OTLP `AnyValue` wrapper object into a [`Value`].
32fn decode_anyvalue(v: &J) -> Value {
33    let Some(obj) = v.as_object() else {
34        return Value::Null;
35    };
36    if let Some(s) = obj.get("stringValue").and_then(J::as_str) {
37        return Value::Str(s.to_string());
38    }
39    if let Some(iv) = obj.get("intValue") {
40        return match iv {
41            // OTLP/JSON encodes int64 as a string; tolerate a bare number too.
42            J::String(s) => s
43                .parse::<i64>()
44                .map_or_else(|_| Value::Str(s.clone()), Value::Int),
45            J::Number(n) => n.as_i64().map_or(Value::Null, Value::Int),
46            _ => Value::Null,
47        };
48    }
49    if let Some(dv) = obj.get("doubleValue") {
50        return dv
51            .as_f64()
52            .filter(|f| f.is_finite())
53            .map_or(Value::Null, Value::Float);
54    }
55    if let Some(bv) = obj.get("boolValue") {
56        return bv.as_bool().map_or(Value::Null, Value::Bool);
57    }
58    // arrayValue / kvlistValue / bytesValue: keep the canonical JSON, don't
59    // explode into columns. An empty `{}` AnyValue is honest absence.
60    for key in ["arrayValue", "kvlistValue", "bytesValue"] {
61        if let Some(inner) = obj.get(key) {
62            return Value::Str(inner.to_string());
63        }
64    }
65    Value::Null
66}
67
68/// Flattens an OTLP attribute list (`[{key, value}]`) into `row`, key-prefixed.
69fn collect_attributes(attrs: Option<&J>, prefix: &str, row: &mut BTreeMap<String, Value>) {
70    for kv in attrs.and_then(J::as_array).into_iter().flatten() {
71        if let (Some(k), Some(val)) = (kv.get("key").and_then(J::as_str), kv.get("value")) {
72            row.insert(format!("{prefix}{k}"), decode_anyvalue(val));
73        }
74    }
75}
76
77/// Reads an OTLP unix-nano timestamp (string-encoded uint64, or a bare number).
78fn unix_nano(v: Option<&J>) -> Option<i64> {
79    match v? {
80        J::String(s) => s.parse().ok(),
81        J::Number(n) => n.as_i64(),
82        _ => None,
83    }
84}
85
86/// Inserts a string span field as a column, if present.
87fn insert_str(row: &mut BTreeMap<String, Value>, span: &J, field: &str, column: &str) {
88    if let Some(s) = span.get(field).and_then(J::as_str) {
89        row.insert(column.to_string(), Value::Str(s.to_string()));
90    }
91}
92
93/// Adds the synthesized span columns. Called after attributes so the structural
94/// fields win any (rare) collision with an attribute of the same name.
95fn add_span_fields(span: &J, row: &mut BTreeMap<String, Value>) {
96    insert_str(row, span, "traceId", "traceId");
97    insert_str(row, span, "spanId", "spanId");
98    insert_str(row, span, "name", "name");
99    // A root span has an empty parentSpanId — record it as absent, not "".
100    if let Some(p) = span.get("parentSpanId").and_then(J::as_str) {
101        if !p.is_empty() {
102            row.insert("parentSpanId".to_string(), Value::Str(p.to_string()));
103        }
104    }
105    if let Some(kind) = span.get("kind").and_then(J::as_i64) {
106        row.insert("kind".to_string(), Value::Int(kind));
107    }
108    let start = unix_nano(span.get("startTimeUnixNano"));
109    let end = unix_nano(span.get("endTimeUnixNano"));
110    if let Some(s) = start {
111        row.insert("startTimeUnixNano".to_string(), Value::Int(s));
112    }
113    if let Some(e) = end {
114        row.insert("endTimeUnixNano".to_string(), Value::Int(e));
115    }
116    // The headline metric: span duration. checked_sub guards a malformed pair.
117    if let (Some(s), Some(e)) = (start, end) {
118        if let Some(d) = e.checked_sub(s) {
119            row.insert("durationNanos".to_string(), Value::Int(d));
120        }
121    }
122    if let Some(status) = span.get("status") {
123        if let Some(code) = status.get("code").and_then(J::as_i64) {
124            row.insert("statusCode".to_string(), Value::Int(code));
125        }
126        insert_str(row, status, "message", "statusMessage");
127    }
128}
129
130impl OtlpParser {
131    fn err(&self, msg: impl std::fmt::Display) -> AxError {
132        AxError::Parse {
133            format: self.id().to_string(),
134            message: msg.to_string(),
135        }
136    }
137}
138
139impl FormatParser for OtlpParser {
140    fn id(&self) -> &'static str {
141        "otlp"
142    }
143    fn extensions(&self) -> &'static [&'static str] {
144        &["otlp"]
145    }
146    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
147        let value: J = serde_json::from_slice(bytes).ok()?;
148        value
149            .get("resourceSpans")
150            .is_some_and(J::is_array)
151            .then_some(STRONG)
152    }
153    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
154        let root: J = serde_json::from_slice(bytes).map_err(|e| self.err(e))?;
155        let resource_spans = root
156            .get("resourceSpans")
157            .and_then(J::as_array)
158            .ok_or_else(|| self.err("not OTLP traces: missing 'resourceSpans' array"))?;
159
160        let mut builder = TableBuilder::new();
161        for rs in resource_spans {
162            let mut resource_row: BTreeMap<String, Value> = BTreeMap::new();
163            if let Some(resource) = rs.get("resource") {
164                collect_attributes(resource.get("attributes"), "resource.", &mut resource_row);
165            }
166            for ss in rs
167                .get("scopeSpans")
168                .and_then(J::as_array)
169                .into_iter()
170                .flatten()
171            {
172                let mut scope_row = resource_row.clone();
173                if let Some(scope) = ss.get("scope") {
174                    insert_str(&mut scope_row, scope, "name", "scope.name");
175                    insert_str(&mut scope_row, scope, "version", "scope.version");
176                }
177                for span in ss.get("spans").and_then(J::as_array).into_iter().flatten() {
178                    let mut row = scope_row.clone();
179                    collect_attributes(span.get("attributes"), "", &mut row);
180                    add_span_fields(span, &mut row);
181                    builder.push_row(row);
182                }
183            }
184        }
185        Ok(builder.finish())
186    }
187}
188
189#[cfg(test)]
190mod tests {
191    use super::*;
192    use ax_core::ColType;
193
194    const TRACE: &str = r#"{
195      "resourceSpans": [{
196        "resource": {
197          "attributes": [
198            {"key": "service.name", "value": {"stringValue": "checkout"}}
199          ]
200        },
201        "scopeSpans": [{
202          "scope": {"name": "tracer", "version": "1.2.0"},
203          "spans": [
204            {
205              "traceId": "5b8efff798038103d269b633813fc60c",
206              "spanId": "eee19b7ec3c1b174",
207              "name": "GET /cart",
208              "kind": 2,
209              "startTimeUnixNano": "1544712660000000000",
210              "endTimeUnixNano": "1544712660500000000",
211              "attributes": [
212                {"key": "http.method", "value": {"stringValue": "GET"}},
213                {"key": "http.status_code", "value": {"intValue": "200"}},
214                {"key": "sampling.ratio", "value": {"doubleValue": 0.25}},
215                {"key": "cache.hit", "value": {"boolValue": true}}
216              ],
217              "status": {"code": 1}
218            },
219            {
220              "traceId": "5b8efff798038103d269b633813fc60c",
221              "spanId": "f00d",
222              "parentSpanId": "eee19b7ec3c1b174",
223              "name": "db.query",
224              "kind": 3,
225              "startTimeUnixNano": "1544712660100000000",
226              "endTimeUnixNano": "1544712660900000000",
227              "status": {"code": 2, "message": "timeout"}
228            }
229          ]
230        }]
231      }]
232    }"#;
233
234    fn parse(s: &str) -> Vec<Column> {
235        OtlpParser.parse("-", s.as_bytes()).unwrap()
236    }
237    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
238        cols.iter()
239            .find(|c| c.name == name)
240            .unwrap_or_else(|| panic!("missing column {name}"))
241    }
242
243    #[test]
244    fn one_row_per_span_with_duration() {
245        let cols = parse(TRACE);
246        let dur = col(&cols, "durationNanos");
247        assert_eq!(dur.ty, ColType::Int);
248        assert_eq!(dur.cells.len(), 2, "two spans");
249        assert_eq!(dur.cells[0], Value::Int(500_000_000)); // 0.5s
250        assert_eq!(dur.cells[1], Value::Int(800_000_000)); // 0.8s
251    }
252
253    #[test]
254    fn synthesized_span_fields() {
255        let cols = parse(TRACE);
256        assert_eq!(
257            col(&cols, "startTimeUnixNano").cells[0],
258            Value::Int(1_544_712_660_000_000_000)
259        );
260        assert_eq!(col(&cols, "kind").cells[0], Value::Int(2));
261        assert_eq!(col(&cols, "name").cells[1], Value::Str("db.query".into()));
262        assert_eq!(col(&cols, "statusCode").cells[1], Value::Int(2));
263        assert_eq!(
264            col(&cols, "statusMessage").cells[1],
265            Value::Str("timeout".into())
266        );
267    }
268
269    #[test]
270    fn parent_span_id_absent_on_root_present_on_child() {
271        let cols = parse(TRACE);
272        let parent = col(&cols, "parentSpanId");
273        assert_eq!(parent.cells[0], Value::Null, "root span has no parent");
274        assert_eq!(parent.cells[1], Value::Str("eee19b7ec3c1b174".into()));
275    }
276
277    #[test]
278    fn resource_and_scope_attributes_flatten_onto_every_span() {
279        let cols = parse(TRACE);
280        let svc = col(&cols, "resource.service.name");
281        assert_eq!(svc.cells[0], Value::Str("checkout".into()));
282        assert_eq!(svc.cells[1], Value::Str("checkout".into()), "replicated");
283        assert_eq!(
284            col(&cols, "scope.name").cells[0],
285            Value::Str("tracer".into())
286        );
287        assert_eq!(
288            col(&cols, "scope.version").cells[0],
289            Value::Str("1.2.0".into())
290        );
291    }
292
293    #[test]
294    fn any_value_decoding_per_type() {
295        let cols = parse(TRACE);
296        assert_eq!(col(&cols, "http.method").cells[0], Value::Str("GET".into()));
297        assert_eq!(col(&cols, "http.status_code").cells[0], Value::Int(200)); // string int
298        assert_eq!(col(&cols, "sampling.ratio").cells[0], Value::Float(0.25));
299        assert_eq!(col(&cols, "cache.hit").cells[0], Value::Bool(true));
300        // Span 2 has none of these attributes → null.
301        assert_eq!(col(&cols, "http.method").cells[1], Value::Null);
302    }
303
304    #[test]
305    fn decode_anyvalue_units() {
306        assert_eq!(
307            decode_anyvalue(&serde_json::json!({"intValue": 7})),
308            Value::Int(7)
309        ); // bare number
310        assert_eq!(
311            decode_anyvalue(&serde_json::json!({"arrayValue": {"values": []}})),
312            Value::Str("{\"values\":[]}".into())
313        );
314        assert_eq!(decode_anyvalue(&serde_json::json!({})), Value::Null);
315        assert_eq!(
316            decode_anyvalue(&serde_json::json!("not an object")),
317            Value::Null
318        );
319    }
320
321    #[test]
322    fn malformed_and_non_otlp_error() {
323        // Not JSON at all.
324        assert!(matches!(
325            OtlpParser.parse("-", b"{not json"),
326            Err(AxError::Parse { .. })
327        ));
328        // Valid JSON but not an OTLP trace export.
329        assert!(matches!(
330            OtlpParser.parse("-", br#"{"foo": 1}"#),
331            Err(AxError::Parse { .. })
332        ));
333    }
334
335    #[test]
336    fn sniff_keys_on_resource_spans() {
337        assert_eq!(OtlpParser.sniff(TRACE.as_bytes()), Some(STRONG));
338        assert_eq!(OtlpParser.sniff(br#"{"resourceSpans": []}"#), Some(STRONG));
339        // resourceSpans must be an array, not just any value.
340        assert_eq!(OtlpParser.sniff(br#"{"resourceSpans": 1}"#), None);
341        assert_eq!(OtlpParser.sniff(br#"{"resourceLogs": []}"#), None); // logs, not traces
342        assert_eq!(OtlpParser.sniff(b"{\"a\":1}"), None);
343        assert_eq!(OtlpParser.sniff(b"a,b,c\n1,2,3"), None); // not even JSON
344    }
345
346    #[test]
347    fn unix_nano_accepts_string_and_number() {
348        assert_eq!(unix_nano(Some(&serde_json::json!("123"))), Some(123)); // string-encoded
349        assert_eq!(unix_nano(Some(&serde_json::json!(456))), Some(456)); // bare number
350        assert_eq!(unix_nano(Some(&serde_json::json!(true))), None); // neither
351        assert_eq!(unix_nano(None), None);
352    }
353
354    #[test]
355    fn claims_otlp_extension() {
356        assert_eq!(OtlpParser.extensions(), &["otlp"]);
357    }
358
359    #[test]
360    fn resolves_by_extension_and_content() {
361        let reg = crate::parser::ParserRegistry::default();
362        assert_eq!(
363            reg.resolve("dump.otlp", TRACE.as_bytes()).unwrap().id(),
364            "otlp"
365        );
366        // On stdin, the resourceSpans signature wins over the generic JSON sniff.
367        assert_eq!(reg.resolve("-", TRACE.as_bytes()).unwrap().id(), "otlp");
368    }
369}