Skip to main content

anomalyx_normalize/parsers/
prometheus.rs

1//! Prometheus / OpenMetrics text exposition parser — scrape dumps.
2//!
3//! Each sample line is `name{label="v",...} value [timestamp]`. We pivot the
4//! exposition into a row-per-sample table where **the metric name is its own
5//! value column** (so a per-series `point` spike is visible), every label key is
6//! its own `Str` column, and the optional millisecond `timestamp` is one more
7//! column. A sample that omits a given metric/label leaves that cell `Null`
8//! (honest absence). Non-finite values (`+Inf`, `-Inf`, `NaN`) become `Null` —
9//! they cannot enter a deterministic numeric reduction.
10//!
11//! `# HELP` / `# TYPE` / `# UNIT` / `# EOF` and plain `#` comments are metadata
12//! and produce no rows. Detected by the exposition shape; claims the `.prom`
13//! extension (the node_exporter textfile convention).
14
15use crate::parser::{Confidence, FormatParser, STRONG};
16use crate::table::TableBuilder;
17use ax_core::{AxError, Column, Value};
18use std::collections::BTreeMap;
19
20#[derive(Debug, Default, Clone)]
21pub struct PrometheusParser;
22
23/// A metric/label name may start with a letter, `_`, or `:`; subsequent
24/// characters may also be digits. (Leniently shared by metric and label names —
25/// real label names never use `:`, so accepting it is harmless.)
26fn is_name_start(c: char) -> bool {
27    c.is_ascii_alphabetic() || c == '_' || c == ':'
28}
29fn is_name_char(c: char) -> bool {
30    is_name_start(c) || c.is_ascii_digit()
31}
32
33/// A parsed sample line.
34#[derive(Debug)]
35struct Sample {
36    name: String,
37    labels: Vec<(String, String)>,
38    value: Value,
39    timestamp: Option<i64>,
40}
41
42/// A character cursor over one line.
43struct Cursor<'a> {
44    chars: &'a [char],
45    pos: usize,
46}
47
48impl Cursor<'_> {
49    fn peek(&self) -> Option<char> {
50        self.chars.get(self.pos).copied()
51    }
52    fn bump(&mut self) -> Option<char> {
53        let c = self.peek();
54        if c.is_some() {
55            self.pos += 1;
56        }
57        c
58    }
59    fn skip_ws(&mut self) {
60        while matches!(self.peek(), Some(' ') | Some('\t')) {
61            self.pos += 1;
62        }
63    }
64    /// Reads a `name`-shaped identifier; `None` if the next char can't start one.
65    fn read_name(&mut self) -> Option<String> {
66        match self.peek() {
67            Some(c) if is_name_start(c) => {}
68            _ => return None,
69        }
70        let mut s = String::new();
71        while let Some(c) = self.peek() {
72            if is_name_char(c) {
73                s.push(c);
74                self.bump();
75            } else {
76                break;
77            }
78        }
79        Some(s)
80    }
81    /// Reads a run of non-whitespace characters (value / timestamp token).
82    fn read_token(&mut self) -> String {
83        let mut s = String::new();
84        while let Some(c) = self.peek() {
85            if c == ' ' || c == '\t' {
86                break;
87            }
88            s.push(c);
89            self.bump();
90        }
91        s
92    }
93}
94
95/// Parses a `{...}` label set; the cursor is positioned just after `{`.
96fn read_labels(cur: &mut Cursor) -> Result<Vec<(String, String)>, String> {
97    let mut labels = Vec::new();
98    loop {
99        cur.skip_ws();
100        match cur.peek() {
101            Some('}') => {
102                cur.bump();
103                return Ok(labels);
104            }
105            None => return Err("unterminated label set".into()),
106            _ => {}
107        }
108        let name = cur.read_name().ok_or("expected label name")?;
109        cur.skip_ws();
110        if cur.bump() != Some('=') {
111            return Err("expected '=' after label name".into());
112        }
113        cur.skip_ws();
114        if cur.bump() != Some('"') {
115            return Err("expected '\"' to open label value".into());
116        }
117        labels.push((name, read_label_value(cur)?));
118        cur.skip_ws();
119        match cur.bump() {
120            Some(',') => continue,
121            Some('}') => return Ok(labels),
122            _ => return Err("expected ',' or '}' after label".into()),
123        }
124    }
125}
126
127/// Reads a quoted label value (cursor just after the opening `"`), decoding the
128/// three Prometheus escapes `\\`, `\"`, `\n`.
129fn read_label_value(cur: &mut Cursor) -> Result<String, String> {
130    let mut s = String::new();
131    loop {
132        match cur.bump() {
133            None => return Err("unterminated label value".into()),
134            Some('"') => return Ok(s),
135            Some('\\') => match cur.bump() {
136                Some('\\') => s.push('\\'),
137                Some('"') => s.push('"'),
138                Some('n') => s.push('\n'),
139                Some(other) => s.push(other),
140                None => return Err("dangling escape in label value".into()),
141            },
142            Some(c) => s.push(c),
143        }
144    }
145}
146
147/// A metric value is a float64; non-finite (`+Inf`/`-Inf`/`NaN`) maps to `Null`.
148fn parse_value(tok: &str) -> Result<Value, String> {
149    match tok.parse::<f64>() {
150        Ok(f) if f.is_finite() => Ok(Value::Float(f)),
151        Ok(_) => Ok(Value::Null),
152        Err(_) => Err(format!("invalid metric value '{tok}'")),
153    }
154}
155
156/// Parses one non-comment sample line.
157fn parse_sample(line: &str) -> Result<Sample, String> {
158    let chars: Vec<char> = line.chars().collect();
159    let mut cur = Cursor {
160        chars: &chars,
161        pos: 0,
162    };
163    cur.skip_ws();
164    let name = cur.read_name().ok_or("expected metric name")?;
165    let labels = if cur.peek() == Some('{') {
166        cur.bump();
167        read_labels(&mut cur)?
168    } else {
169        Vec::new()
170    };
171    cur.skip_ws();
172    let value_tok = cur.read_token();
173    if value_tok.is_empty() {
174        return Err("missing metric value".into());
175    }
176    let value = parse_value(&value_tok)?;
177    cur.skip_ws();
178    // An optional timestamp, unless an OpenMetrics exemplar (`# ...`) follows.
179    let timestamp = match cur.peek() {
180        None | Some('#') => None,
181        Some(_) => {
182            let ts = cur.read_token();
183            Some(
184                ts.parse::<i64>()
185                    .map_err(|_| format!("invalid timestamp '{ts}'"))?,
186            )
187        }
188    };
189    Ok(Sample {
190        name,
191        labels,
192        value,
193        timestamp,
194    })
195}
196
197/// Is this a `#`-comment line (metadata, never a sample)?
198fn is_comment(line: &str) -> bool {
199    line.trim_start().starts_with('#')
200}
201
202impl PrometheusParser {
203    fn err(&self, msg: impl std::fmt::Display) -> AxError {
204        AxError::Parse {
205            format: self.id().to_string(),
206            message: msg.to_string(),
207        }
208    }
209}
210
211impl FormatParser for PrometheusParser {
212    fn id(&self) -> &'static str {
213        "prometheus"
214    }
215    fn extensions(&self) -> &'static [&'static str] {
216        &["prom"]
217    }
218    fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
219        let text = std::str::from_utf8(bytes).ok()?;
220        for line in text.lines() {
221            if line.trim().is_empty() {
222                continue;
223            }
224            if is_comment(line) {
225                // A HELP/TYPE/UNIT/EOF directive is a decisive signal; a plain
226                // comment is inconclusive, so keep scanning.
227                let rest = line.trim_start().trim_start_matches('#').trim_start();
228                if rest.starts_with("HELP ")
229                    || rest.starts_with("TYPE ")
230                    || rest.starts_with("UNIT ")
231                    || rest == "EOF"
232                {
233                    return Some(STRONG);
234                }
235                continue;
236            }
237            // The first real (non-comment) line decides: a parseable sample is
238            // the exposition format; anything else clearly is not.
239            return parse_sample(line).ok().map(|_| STRONG);
240        }
241        None
242    }
243    fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
244        let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
245        let mut builder = TableBuilder::new();
246        for line in text.lines() {
247            if line.trim().is_empty() || is_comment(line) {
248                continue;
249            }
250            let sample = parse_sample(line).map_err(|m| self.err(m))?;
251            let mut row: BTreeMap<String, Value> = BTreeMap::new();
252            for (k, v) in sample.labels {
253                row.insert(k, Value::Str(v));
254            }
255            if let Some(ts) = sample.timestamp {
256                row.insert("timestamp".into(), Value::Int(ts));
257            }
258            // The metric name is its own value column; inserted last so it wins
259            // any (pathological) collision with a label named the same.
260            row.insert(sample.name, sample.value);
261            builder.push_row(row);
262        }
263        Ok(builder.finish())
264    }
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270    use ax_core::ColType;
271
272    const SCRAPE: &str = "\
273# HELP http_requests_total The total number of HTTP requests.\n\
274# TYPE http_requests_total counter\n\
275http_requests_total{method=\"post\",code=\"200\"} 1027 1395066363000\n\
276http_requests_total{method=\"post\",code=\"400\"} 3 1395066363000\n\
277# a plain comment\n\
278metric_without_labels 12.47\n\
279go_gc_duration_seconds{quantile=\"0.5\"} 0.0001\n\
280# EOF\n";
281
282    fn parse(s: &str) -> Vec<Column> {
283        PrometheusParser.parse("-", s.as_bytes()).unwrap()
284    }
285    fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
286        cols.iter()
287            .find(|c| c.name == name)
288            .unwrap_or_else(|| panic!("missing column {name}"))
289    }
290
291    #[test]
292    fn pivots_metric_name_into_value_column() {
293        let cols = parse(SCRAPE);
294        let reqs = col(&cols, "http_requests_total");
295        assert_eq!(reqs.ty, ColType::Float);
296        assert_eq!(reqs.cells.len(), 4, "one row per sample line");
297        assert_eq!(reqs.cells[0], Value::Float(1027.0));
298        assert_eq!(reqs.cells[1], Value::Float(3.0));
299        // Rows from other metrics leave this column null — per-series isolation.
300        assert_eq!(reqs.cells[2], Value::Null);
301        assert_eq!(reqs.cells[3], Value::Null);
302        assert_eq!(
303            col(&cols, "metric_without_labels").cells[2],
304            Value::Float(12.47)
305        );
306        assert_eq!(
307            col(&cols, "go_gc_duration_seconds").cells[3],
308            Value::Float(0.0001)
309        );
310    }
311
312    #[test]
313    fn labels_become_string_columns() {
314        let cols = parse(SCRAPE);
315        let method = col(&cols, "method");
316        assert_eq!(method.ty, ColType::Str);
317        assert_eq!(method.cells[0], Value::Str("post".into()));
318        assert_eq!(method.cells[2], Value::Null, "unlabeled sample → null");
319        assert_eq!(col(&cols, "code").cells[1], Value::Str("400".into()));
320        assert_eq!(col(&cols, "quantile").cells[3], Value::Str("0.5".into()));
321    }
322
323    #[test]
324    fn timestamp_is_an_int_column_padded_with_null() {
325        let cols = parse(SCRAPE);
326        let ts = col(&cols, "timestamp");
327        assert_eq!(ts.ty, ColType::Int);
328        assert_eq!(ts.cells[0], Value::Int(1_395_066_363_000));
329        assert_eq!(ts.cells[2], Value::Null, "no timestamp on that sample");
330    }
331
332    #[test]
333    fn label_value_escapes_decode() {
334        let cols = parse("m{path=\"C:\\\\DirA\",err=\"a\\\"b\\nc\"} 1\n");
335        assert_eq!(col(&cols, "path").cells[0], Value::Str("C:\\DirA".into()));
336        assert_eq!(col(&cols, "err").cells[0], Value::Str("a\"b\nc".into()));
337        assert_eq!(col(&cols, "m").cells[0], Value::Float(1.0));
338    }
339
340    #[test]
341    fn non_finite_values_are_null() {
342        let cols = parse("a +Inf\nb -Inf\nc NaN\n");
343        assert_eq!(col(&cols, "a").cells[0], Value::Null);
344        assert_eq!(col(&cols, "b").cells[1], Value::Null);
345        assert_eq!(col(&cols, "c").cells[2], Value::Null);
346    }
347
348    #[test]
349    fn names_allow_colon_underscore_and_digits() {
350        // Colon start, underscore, and a trailing digit all in one name.
351        let cols = parse(":_m1 5\n");
352        assert_eq!(col(&cols, ":_m1").cells[0], Value::Float(5.0));
353        // A label name with underscore start and a digit.
354        let c2 = parse("x{_l1=\"v\"} 1\n");
355        assert_eq!(col(&c2, "_l1").cells[0], Value::Str("v".into()));
356    }
357
358    #[test]
359    fn trailing_comma_in_labels_is_accepted() {
360        let cols = parse("m{a=\"1\",} 2\n");
361        assert_eq!(col(&cols, "a").cells[0], Value::Str("1".into()));
362        assert_eq!(col(&cols, "m").cells[0], Value::Float(2.0));
363    }
364
365    #[test]
366    fn malformed_lines_error() {
367        // digit-leading name
368        assert!(PrometheusParser.parse("-", b"1bad 5\n").is_err());
369        // missing value
370        assert!(PrometheusParser
371            .parse("-", b"http_requests_total\n")
372            .is_err());
373        // non-numeric value
374        assert!(PrometheusParser.parse("-", b"foo abc\n").is_err());
375        // unquoted label value
376        assert!(PrometheusParser.parse("-", b"foo{a=1} 5\n").is_err());
377        // non-integer timestamp
378        assert!(PrometheusParser.parse("-", b"foo 1 1.5\n").is_err());
379        // unterminated label set
380        assert!(PrometheusParser.parse("-", b"foo{a=\"1\" 5\n").is_err());
381    }
382
383    #[test]
384    fn sniff_recognizes_exposition() {
385        assert_eq!(PrometheusParser.sniff(SCRAPE.as_bytes()), Some(STRONG));
386        // A bare sample line (no HELP/TYPE) still sniffs.
387        assert_eq!(
388            PrometheusParser.sniff(b"node_cpu_seconds_total 42.5\n"),
389            Some(STRONG)
390        );
391        // Each HELP/TYPE/UNIT/EOF directive alone is decisive.
392        assert_eq!(PrometheusParser.sniff(b"# HELP foo bar\n"), Some(STRONG));
393        assert_eq!(
394            PrometheusParser.sniff(b"# TYPE foo counter\n"),
395            Some(STRONG)
396        );
397        assert_eq!(PrometheusParser.sniff(b"# UNIT foo bytes\n"), Some(STRONG));
398        assert_eq!(PrometheusParser.sniff(b"# EOF\n"), Some(STRONG));
399        // A plain comment is inconclusive on its own.
400        assert_eq!(PrometheusParser.sniff(b"# just a note\n"), None);
401        // Other formats are rejected.
402        assert_eq!(PrometheusParser.sniff(b"a,b,c\n1,2,3"), None); // CSV
403        assert_eq!(PrometheusParser.sniff(b"k=1 v=2\n"), None); // logfmt
404        assert_eq!(PrometheusParser.sniff(b"{\"a\":1}\n"), None); // JSON
405    }
406
407    #[test]
408    fn comment_then_sample_sniffs_via_the_sample() {
409        assert_eq!(
410            PrometheusParser.sniff(b"# a note\nfoo_total 3\n"),
411            Some(STRONG)
412        );
413        assert_eq!(PrometheusParser.sniff(b"# a note\nnonsense !!!\n"), None);
414    }
415
416    #[test]
417    fn unterminated_label_set_has_its_own_diagnostic() {
418        // Running off the end at the top of the label loop (vs. a bad token
419        // mid-pair) is a distinct error — pins the dedicated `None` arm.
420        assert_eq!(parse_sample("m{").unwrap_err(), "unterminated label set");
421        assert_eq!(
422            parse_sample("m{a=\"1\",").unwrap_err(),
423            "unterminated label set"
424        );
425    }
426
427    #[test]
428    fn claims_the_prom_extension() {
429        assert_eq!(PrometheusParser.extensions(), &["prom"]);
430    }
431
432    #[test]
433    fn resolves_by_extension_and_content() {
434        let reg = crate::parser::ParserRegistry::default();
435        assert_eq!(
436            reg.resolve("node.prom", b"# HELP x y\n").unwrap().id(),
437            "prometheus"
438        );
439        // Routed by content sniff without a known extension.
440        assert_eq!(
441            reg.resolve("-", SCRAPE.as_bytes()).unwrap().id(),
442            "prometheus"
443        );
444    }
445}