anomalyx_normalize/parsers/
logfmt.rs1use crate::infer;
10use crate::parser::{Confidence, FormatParser, TEXT};
11use crate::table::TableBuilder;
12use ax_core::{AxError, Column, Value};
13use std::collections::BTreeMap;
14
15#[derive(Debug, Default, Clone)]
16pub struct LogfmtParser;
17
18fn is_key(s: &str) -> bool {
21 let mut chars = s.chars();
22 matches!(chars.next(), Some(c) if c.is_ascii_alphabetic() || c == '_')
23 && s.chars()
24 .all(|c| c.is_ascii_alphanumeric() || c == '_' || c == '.' || c == '-')
25}
26
27fn parse_line(line: &str) -> BTreeMap<String, Value> {
29 let mut out = BTreeMap::new();
30 let mut chars = line.chars().peekable();
31 loop {
32 while chars.peek() == Some(&' ') {
33 chars.next();
34 }
35 if chars.peek().is_none() {
36 break;
37 }
38 let mut key = String::new();
40 while let Some(&c) = chars.peek() {
41 if c == '=' || c == ' ' {
42 break;
43 }
44 key.push(c);
45 chars.next();
46 }
47 if chars.peek() == Some(&'=') {
48 chars.next(); let value = if chars.peek() == Some(&'"') {
50 chars.next(); let mut s = String::new();
52 while let Some(c) = chars.next() {
53 match c {
54 '\\' => {
55 if let Some(esc) = chars.next() {
56 s.push(esc); }
58 }
59 '"' => break,
60 _ => s.push(c),
61 }
62 }
63 Value::Str(s) } else {
65 let mut raw = String::new();
66 while let Some(&c) = chars.peek() {
67 if c == ' ' {
68 break;
69 }
70 raw.push(c);
71 chars.next();
72 }
73 if raw.is_empty() {
74 Value::Null } else {
76 infer::infer_scalar(&raw)
77 }
78 };
79 out.insert(key, value);
80 } else {
81 out.insert(key, Value::Bool(true));
83 }
84 }
85 out
86}
87
88impl FormatParser for LogfmtParser {
89 fn id(&self) -> &'static str {
90 "logfmt"
91 }
92 fn extensions(&self) -> &'static [&'static str] {
93 &["logfmt"]
94 }
95 fn sniff(&self, bytes: &[u8]) -> Option<Confidence> {
96 let text = std::str::from_utf8(bytes).ok()?;
97 let line = text.lines().find(|l| !l.trim().is_empty())?;
98 let tokens: Vec<&str> = line.split_whitespace().collect();
102 if tokens.len() < 2 {
103 return None;
104 }
105 let kv = tokens
106 .iter()
107 .filter(|t| matches!(t.split_once('='), Some((k, _)) if is_key(k)))
108 .count();
109 (kv >= 1 && kv * 2 >= tokens.len()).then_some(TEXT)
110 }
111 fn parse(&self, _source: &str, bytes: &[u8]) -> Result<Vec<Column>, AxError> {
112 let text = std::str::from_utf8(bytes).map_err(|e| AxError::Parse {
113 format: self.id().to_string(),
114 message: e.to_string(),
115 })?;
116 let mut builder = TableBuilder::new();
117 for line in text.lines() {
118 if line.trim().is_empty() {
119 continue;
120 }
121 builder.push_row(parse_line(line));
122 }
123 Ok(builder.finish())
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use super::*;
130 use ax_core::ColType;
131
132 const LOG: &str = "level=info msg=\"request handled\" status=200 dur=0.123 ok=true\n\
133level=error msg=\"db timeout\" status=500 retries=3\n";
134
135 fn parse(s: &str) -> Vec<Column> {
136 LogfmtParser.parse("-", s.as_bytes()).unwrap()
137 }
138 fn col<'a>(cols: &'a [Column], name: &str) -> &'a Column {
139 cols.iter().find(|c| c.name == name).unwrap()
140 }
141
142 #[test]
143 fn parses_typed_values() {
144 let cols = parse(LOG);
145 assert_eq!(col(&cols, "status").ty, ColType::Int);
146 assert_eq!(col(&cols, "dur").ty, ColType::Float);
147 assert_eq!(col(&cols, "level").ty, ColType::Str);
148 assert_eq!(col(&cols, "ok").cells[0], Value::Bool(true));
149 }
150
151 #[test]
152 fn quoted_values_are_strings_with_spaces() {
153 let cols = parse(LOG);
154 assert_eq!(
155 col(&cols, "msg").cells[0],
156 Value::Str("request handled".into())
157 );
158 assert_eq!(col(&cols, "msg").cells[1], Value::Str("db timeout".into()));
159 }
160
161 #[test]
162 fn missing_keys_pad_with_null() {
163 let cols = parse(LOG);
165 assert_eq!(col(&cols, "retries").cells[0], Value::Null);
166 assert_eq!(col(&cols, "retries").cells[1], Value::Int(3));
167 assert_eq!(col(&cols, "dur").null_count(), 1);
168 }
169
170 #[test]
171 fn quote_escapes() {
172 let cols = parse("msg=\"say \\\"hi\\\" now\" path=\"a\\\\b\"\n");
173 assert_eq!(
174 col(&cols, "msg").cells[0],
175 Value::Str("say \"hi\" now".into())
176 );
177 assert_eq!(col(&cols, "path").cells[0], Value::Str("a\\b".into()));
178 }
179
180 #[test]
181 fn bare_flag_and_empty_value() {
182 let cols = parse("debug status= name=x\n");
183 assert_eq!(col(&cols, "debug").cells[0], Value::Bool(true));
184 assert_eq!(col(&cols, "status").cells[0], Value::Null); assert_eq!(col(&cols, "name").cells[0], Value::Str("x".into()));
186 }
187
188 #[test]
189 fn is_key_classification() {
190 assert!(is_key("level"));
191 assert!(is_key("id.orig_h"));
192 assert!(is_key("_x-1"));
193 assert!(!is_key("1abc")); assert!(!is_key("")); assert!(!is_key("a b")); }
197
198 #[test]
199 fn sniff_recognizes_logfmt() {
200 assert_eq!(LogfmtParser.sniff(LOG.as_bytes()), Some(TEXT));
201 assert_eq!(LogfmtParser.sniff(b"level=info status=200"), Some(TEXT));
203 assert_eq!(LogfmtParser.sniff(b"a=1 b c"), None);
206 assert_eq!(LogfmtParser.sniff(b"a,b,c\n1,2,3"), None); assert_eq!(LogfmtParser.sniff(b"just some prose words"), None); assert_eq!(LogfmtParser.sniff(b"single=token"), None); }
210
211 #[test]
212 fn resolves_by_extension_and_content() {
213 let reg = crate::parser::ParserRegistry::default();
214 assert_eq!(reg.resolve("app.logfmt", b"x").unwrap().id(), "logfmt");
215 assert_eq!(
216 reg.resolve("app.log", LOG.as_bytes()).unwrap().id(),
217 "logfmt",
218 "content sniff wins for a .log file"
219 );
220 }
221}