anomalyx_normalize/parsers/
prometheus.rs1use crate::parser::{Confidence, FormatParser, STRONG};
16use crate::table::TableBuilder;
17use ax_core::{AxError, Column, Value};
18use std::collections::BTreeMap;
19
20#[derive(Debug, Default, Clone)]
21pub struct PrometheusParser;
22
23fn is_name_start(c: char) -> bool {
27 c.is_ascii_alphabetic() || c == '_' || c == ':'
28}
29fn is_name_char(c: char) -> bool {
30 is_name_start(c) || c.is_ascii_digit()
31}
32
33#[derive(Debug)]
35struct Sample {
36 name: String,
37 labels: Vec<(String, String)>,
38 value: Value,
39 timestamp: Option<i64>,
40}
41
42struct Cursor<'a> {
44 chars: &'a [char],
45 pos: usize,
46}
47
48impl Cursor<'_> {
49 fn peek(&self) -> Option<char> {
50 self.chars.get(self.pos).copied()
51 }
52 fn bump(&mut self) -> Option<char> {
53 let c = self.peek();
54 if c.is_some() {
55 self.pos += 1;
56 }
57 c
58 }
59 fn skip_ws(&mut self) {
60 while matches!(self.peek(), Some(' ') | Some('\t')) {
61 self.pos += 1;
62 }
63 }
64 fn read_name(&mut self) -> Option<String> {
66 match self.peek() {
67 Some(c) if is_name_start(c) => {}
68 _ => return None,
69 }
70 let mut s = String::new();
71 while let Some(c) = self.peek() {
72 if is_name_char(c) {
73 s.push(c);
74 self.bump();
75 } else {
76 break;
77 }
78 }
79 Some(s)
80 }
81 fn read_token(&mut self) -> String {
83 let mut s = String::new();
84 while let Some(c) = self.peek() {
85 if c == ' ' || c == '\t' {
86 break;
87 }
88 s.push(c);
89 self.bump();
90 }
91 s
92 }
93}
94
95fn read_labels(cur: &mut Cursor) -> Result<Vec<(String, String)>, String> {
97 let mut labels = Vec::new();
98 loop {
99 cur.skip_ws();
100 match cur.peek() {
101 Some('}') => {
102 cur.bump();
103 return Ok(labels);
104 }
105 None => return Err("unterminated label set".into()),
106 _ => {}
107 }
108 let name = cur.read_name().ok_or("expected label name")?;
109 cur.skip_ws();
110 if cur.bump() != Some('=') {
111 return Err("expected '=' after label name".into());
112 }
113 cur.skip_ws();
114 if cur.bump() != Some('"') {
115 return Err("expected '\"' to open label value".into());
116 }
117 labels.push((name, read_label_value(cur)?));
118 cur.skip_ws();
119 match cur.bump() {
120 Some(',') => continue,
121 Some('}') => return Ok(labels),
122 _ => return Err("expected ',' or '}' after label".into()),
123 }
124 }
125}
126
127fn read_label_value(cur: &mut Cursor) -> Result<String, String> {
130 let mut s = String::new();
131 loop {
132 match cur.bump() {
133 None => return Err("unterminated label value".into()),
134 Some('"') => return Ok(s),
135 Some('\\') => match cur.bump() {
136 Some('\\') => s.push('\\'),
137 Some('"') => s.push('"'),
138 Some('n') => s.push('\n'),
139 Some(other) => s.push(other),
140 None => return Err("dangling escape in label value".into()),
141 },
142 Some(c) => s.push(c),
143 }
144 }
145}
146
147fn parse_value(tok: &str) -> Result<Value, String> {
149 match tok.parse::<f64>() {
150 Ok(f) if f.is_finite() => Ok(Value::Float(f)),
151 Ok(_) => Ok(Value::Null),
152 Err(_) => Err(format!("invalid metric value '{tok}'")),
153 }
154}
155
156fn parse_sample(line: &str) -> Result<Sample, String> {
158 let chars: Vec<char> = line.chars().collect();
159 let mut cur = Cursor {
160 chars: &chars,
161 pos: 0,
162 };
163 cur.skip_ws();
164 let name = cur.read_name().ok_or("expected metric name")?;
165 let labels = if cur.peek() == Some('{') {
166 cur.bump();
167 read_labels(&mut cur)?
168 } else {
169 Vec::new()
170 };
171 cur.skip_ws();
172 let value_tok = cur.read_token();
173 if value_tok.is_empty() {
174 return Err("missing metric value".into());
175 }
176 let value = parse_value(&value_tok)?;
177 cur.skip_ws();
178 let timestamp = match cur.peek() {
180 None | Some('#') => None,
181 Some(_) => {
182 let ts = cur.read_token();
183 Some(
184 ts.parse::<i64>()
185 .map_err(|_| format!("invalid timestamp '{ts}'"))?,
186 )
187 }
188 };
189 Ok(Sample {
190 name,
191 labels,
192 value,
193 timestamp,
194 })
195}
196
197fn is_comment(line: &str) -> bool {
199 line.trim_start().starts_with('#')
200}
201
202impl PrometheusParser {
203 fn err(&self, msg: impl std::fmt::Display) -> AxError {
204 AxError::Parse {
205 format: self.id().to_string(),
206 message: msg.to_string(),
207 }
208 }
209}
210
211impl FormatParser for PrometheusParser {
212 fn id(&self) -> &'static str {
213 "prometheus"
214 }
215 fn extensions(&self) -> &'static [&'static str] {
216 &["prom"]
217 }
218 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
219 let text = std::str::from_utf8(bytes).ok()?;
220 for line in text.lines() {
221 if line.trim().is_empty() {
222 continue;
223 }
224 if is_comment(line) {
225 let rest = line.trim_start().trim_start_matches('#').trim_start();
228 if rest.starts_with("HELP ")
229 || rest.starts_with("TYPE ")
230 || rest.starts_with("UNIT ")
231 || rest == "EOF"
232 {
233 return Some(STRONG);
234 }
235 continue;
236 }
237 return parse_sample(line).ok().map(|_| STRONG);
240 }
241 None
242 }
243 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
244 let text = std::str::from_utf8(bytes).map_err(|e| self.err(e))?;
245 let mut builder = TableBuilder::new();
246 for line in text.lines() {
247 if line.trim().is_empty() || is_comment(line) {
248 continue;
249 }
250 let sample = parse_sample(line).map_err(|m| self.err(m))?;
251 let mut row: BTreeMap<String, Value> = BTreeMap::new();
252 for (k, v) in sample.labels {
253 row.insert(k, Value::Str(v));
254 }
255 if let Some(ts) = sample.timestamp {
256 row.insert("timestamp".into(), Value::Int(ts));
257 }
258 row.insert(sample.name, sample.value);
261 builder.push_row(row);
262 }
263 Ok(builder.finish())
264 }
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270 use ax_core::ColType;
271
272 const SCRAPE: &str = "\
273# HELP http_requests_total The total number of HTTP requests.\n\
274# TYPE http_requests_total counter\n\
275http_requests_total{method=\"post\",code=\"200\"} 1027 1395066363000\n\
276http_requests_total{method=\"post\",code=\"400\"} 3 1395066363000\n\
277# a plain comment\n\
278metric_without_labels 12.47\n\
279go_gc_duration_seconds{quantile=\"0.5\"} 0.0001\n\
280# EOF\n";
281
282 fn parse(s: &str) -> Vec<Column> {
283 PrometheusParser.parse("-", s.as_bytes()).unwrap()
284 }
285 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
286 cols.iter()
287 .find(|c| c.name == name)
288 .unwrap_or_else(|| panic!("missing column {name}"))
289 }
290
291 #[test]
292 fn pivots_metric_name_into_value_column() {
293 let cols = parse(SCRAPE);
294 let reqs = col(&cols, "http_requests_total");
295 assert_eq!(reqs.ty, ColType::Float);
296 assert_eq!(reqs.cells.len(), 4, "one row per sample line");
297 assert_eq!(reqs.cells[0], Value::Float(1027.0));
298 assert_eq!(reqs.cells[1], Value::Float(3.0));
299 assert_eq!(reqs.cells[2], Value::Null);
301 assert_eq!(reqs.cells[3], Value::Null);
302 assert_eq!(
303 col(&cols, "metric_without_labels").cells[2],
304 Value::Float(12.47)
305 );
306 assert_eq!(
307 col(&cols, "go_gc_duration_seconds").cells[3],
308 Value::Float(0.0001)
309 );
310 }
311
312 #[test]
313 fn labels_become_string_columns() {
314 let cols = parse(SCRAPE);
315 let method = col(&cols, "method");
316 assert_eq!(method.ty, ColType::Str);
317 assert_eq!(method.cells[0], Value::Str("post".into()));
318 assert_eq!(method.cells[2], Value::Null, "unlabeled sample → null");
319 assert_eq!(col(&cols, "code").cells[1], Value::Str("400".into()));
320 assert_eq!(col(&cols, "quantile").cells[3], Value::Str("0.5".into()));
321 }
322
323 #[test]
324 fn timestamp_is_an_int_column_padded_with_null() {
325 let cols = parse(SCRAPE);
326 let ts = col(&cols, "timestamp");
327 assert_eq!(ts.ty, ColType::Int);
328 assert_eq!(ts.cells[0], Value::Int(1_395_066_363_000));
329 assert_eq!(ts.cells[2], Value::Null, "no timestamp on that sample");
330 }
331
332 #[test]
333 fn label_value_escapes_decode() {
334 let cols = parse("m{path=\"C:\\\\DirA\",err=\"a\\\"b\\nc\"} 1\n");
335 assert_eq!(col(&cols, "path").cells[0], Value::Str("C:\\DirA".into()));
336 assert_eq!(col(&cols, "err").cells[0], Value::Str("a\"b\nc".into()));
337 assert_eq!(col(&cols, "m").cells[0], Value::Float(1.0));
338 }
339
340 #[test]
341 fn non_finite_values_are_null() {
342 let cols = parse("a +Inf\nb -Inf\nc NaN\n");
343 assert_eq!(col(&cols, "a").cells[0], Value::Null);
344 assert_eq!(col(&cols, "b").cells[1], Value::Null);
345 assert_eq!(col(&cols, "c").cells[2], Value::Null);
346 }
347
348 #[test]
349 fn names_allow_colon_underscore_and_digits() {
350 let cols = parse(":_m1 5\n");
352 assert_eq!(col(&cols, ":_m1").cells[0], Value::Float(5.0));
353 let c2 = parse("x{_l1=\"v\"} 1\n");
355 assert_eq!(col(&c2, "_l1").cells[0], Value::Str("v".into()));
356 }
357
358 #[test]
359 fn trailing_comma_in_labels_is_accepted() {
360 let cols = parse("m{a=\"1\",} 2\n");
361 assert_eq!(col(&cols, "a").cells[0], Value::Str("1".into()));
362 assert_eq!(col(&cols, "m").cells[0], Value::Float(2.0));
363 }
364
365 #[test]
366 fn malformed_lines_error() {
367 assert!(PrometheusParser.parse("-", b"1bad 5\n").is_err());
369 assert!(PrometheusParser
371 .parse("-", b"http_requests_total\n")
372 .is_err());
373 assert!(PrometheusParser.parse("-", b"foo abc\n").is_err());
375 assert!(PrometheusParser.parse("-", b"foo{a=1} 5\n").is_err());
377 assert!(PrometheusParser.parse("-", b"foo 1 1.5\n").is_err());
379 assert!(PrometheusParser.parse("-", b"foo{a=\"1\" 5\n").is_err());
381 }
382
383 #[test]
384 fn sniff_recognizes_exposition() {
385 assert_eq!(PrometheusParser.sniff(SCRAPE.as_bytes()), Some(STRONG));
386 assert_eq!(
388 PrometheusParser.sniff(b"node_cpu_seconds_total 42.5\n"),
389 Some(STRONG)
390 );
391 assert_eq!(PrometheusParser.sniff(b"# HELP foo bar\n"), Some(STRONG));
393 assert_eq!(
394 PrometheusParser.sniff(b"# TYPE foo counter\n"),
395 Some(STRONG)
396 );
397 assert_eq!(PrometheusParser.sniff(b"# UNIT foo bytes\n"), Some(STRONG));
398 assert_eq!(PrometheusParser.sniff(b"# EOF\n"), Some(STRONG));
399 assert_eq!(PrometheusParser.sniff(b"# just a note\n"), None);
401 assert_eq!(PrometheusParser.sniff(b"a,b,c\n1,2,3"), None); assert_eq!(PrometheusParser.sniff(b"k=1 v=2\n"), None); assert_eq!(PrometheusParser.sniff(b"{\"a\":1}\n"), None); }
406
407 #[test]
408 fn comment_then_sample_sniffs_via_the_sample() {
409 assert_eq!(
410 PrometheusParser.sniff(b"# a note\nfoo_total 3\n"),
411 Some(STRONG)
412 );
413 assert_eq!(PrometheusParser.sniff(b"# a note\nnonsense !!!\n"), None);
414 }
415
416 #[test]
417 fn unterminated_label_set_has_its_own_diagnostic() {
418 assert_eq!(parse_sample("m{").unwrap_err(), "unterminated label set");
421 assert_eq!(
422 parse_sample("m{a=\"1\",").unwrap_err(),
423 "unterminated label set"
424 );
425 }
426
427 #[test]
428 fn claims_the_prom_extension() {
429 assert_eq!(PrometheusParser.extensions(), &["prom"]);
430 }
431
432 #[test]
433 fn resolves_by_extension_and_content() {
434 let reg = crate::parser::ParserRegistry::default();
435 assert_eq!(
436 reg.resolve("node.prom", b"# HELP x y\n").unwrap().id(),
437 "prometheus"
438 );
439 assert_eq!(
441 reg.resolve("-", SCRAPE.as_bytes()).unwrap().id(),
442 "prometheus"
443 );
444 }
445}