1use std::io::{BufRead, Read};
2
3use indexmap::IndexMap;
4use regex::Regex;
5
6use crate::error::DkitError;
7use crate::format::FormatReader;
8use crate::value::Value;
9
10#[derive(Debug, Clone, Copy, PartialEq)]
12pub enum LogFormat {
13 ApacheCombined,
14 ApacheCommon,
15 Nginx,
16 Syslog,
17}
18
19#[derive(Debug, Clone)]
21pub struct LogReaderOptions {
22 pub on_error: LogParseErrorMode,
24}
25
26impl Default for LogReaderOptions {
27 fn default() -> Self {
28 Self {
29 on_error: LogParseErrorMode::Skip,
30 }
31 }
32}
33
34#[derive(Debug, Clone, Copy, PartialEq)]
36pub enum LogParseErrorMode {
37 Skip,
39 Raw,
41}
42
43pub struct LogReader {
48 pattern: CompiledPattern,
49 options: LogReaderOptions,
50}
51
52struct CompiledPattern {
53 regex: Regex,
54 field_names: Vec<String>,
55}
56
57impl LogReader {
58 pub fn new(format_str: &str, options: LogReaderOptions) -> anyhow::Result<Self> {
63 let pattern = match format_str.to_lowercase().as_str() {
64 "apache" | "apache-combined" | "combined" => {
65 compile_predefined(LogFormat::ApacheCombined)?
66 }
67 "apache-common" | "common" => compile_predefined(LogFormat::ApacheCommon)?,
68 "nginx" => compile_predefined(LogFormat::Nginx)?,
69 "syslog" => compile_predefined(LogFormat::Syslog)?,
70 _ => compile_custom_pattern(format_str)?,
71 };
72 Ok(Self { pattern, options })
73 }
74
75 fn parse_lines(&self, input: &str) -> anyhow::Result<Value> {
76 let mut items = Vec::new();
77 for line in input.lines() {
78 let trimmed = line.trim();
79 if trimmed.is_empty() {
80 continue;
81 }
82 match self.parse_line(trimmed) {
83 Some(obj) => items.push(obj),
84 None => match self.options.on_error {
85 LogParseErrorMode::Skip => {}
86 LogParseErrorMode::Raw => {
87 let mut map = IndexMap::new();
88 map.insert("_raw".to_string(), Value::String(trimmed.to_string()));
89 items.push(Value::Object(map));
90 }
91 },
92 }
93 }
94 Ok(Value::Array(items))
95 }
96
97 fn parse_line(&self, line: &str) -> Option<Value> {
98 let caps = self.pattern.regex.captures(line)?;
99 let mut map = IndexMap::new();
100 for (i, name) in self.pattern.field_names.iter().enumerate() {
101 let val = caps
102 .get(i + 1)
103 .map(|m| m.as_str().to_string())
104 .unwrap_or_default();
105 if val == "-" {
108 map.insert(name.clone(), Value::Null);
109 } else if let Ok(n) = val.parse::<i64>() {
110 map.insert(name.clone(), Value::Integer(n));
111 } else if let Ok(f) = val.parse::<f64>() {
112 if val.contains('.') {
113 map.insert(name.clone(), Value::Float(f));
114 } else {
115 map.insert(name.clone(), Value::String(val));
116 }
117 } else {
118 map.insert(name.clone(), Value::String(val));
119 }
120 }
121 Some(Value::Object(map))
122 }
123}
124
125impl FormatReader for LogReader {
126 fn read(&self, input: &str) -> anyhow::Result<Value> {
127 self.parse_lines(input)
128 }
129
130 fn read_from_reader(&self, reader: impl Read) -> anyhow::Result<Value> {
131 let buf_reader = std::io::BufReader::new(reader);
132 let mut items = Vec::new();
133 for line_result in buf_reader.lines() {
134 let line = line_result.map_err(|e| DkitError::ParseError {
135 format: "Log".to_string(),
136 source: Box::new(e),
137 })?;
138 let trimmed = line.trim().to_string();
139 if trimmed.is_empty() {
140 continue;
141 }
142 match self.parse_line(&trimmed) {
143 Some(obj) => items.push(obj),
144 None => match self.options.on_error {
145 LogParseErrorMode::Skip => {}
146 LogParseErrorMode::Raw => {
147 let mut map = IndexMap::new();
148 map.insert("_raw".to_string(), Value::String(trimmed));
149 items.push(Value::Object(map));
150 }
151 },
152 }
153 }
154 Ok(Value::Array(items))
155 }
156}
157
158fn compile_predefined(format: LogFormat) -> anyhow::Result<CompiledPattern> {
160 match format {
161 LogFormat::ApacheCombined => {
162 let regex = Regex::new(
166 r#"^(\S+) (\S+) (\S+) \[([^\]]+)\] "([^"]*)" (\d{3}) (\S+) "([^"]*)" "([^"]*)"$"#,
167 )?;
168 Ok(CompiledPattern {
169 regex,
170 field_names: vec![
171 "remote_host".into(),
172 "ident".into(),
173 "remote_user".into(),
174 "timestamp".into(),
175 "request".into(),
176 "status".into(),
177 "bytes".into(),
178 "referer".into(),
179 "user_agent".into(),
180 ],
181 })
182 }
183 LogFormat::ApacheCommon => {
184 let regex = Regex::new(r#"^(\S+) (\S+) (\S+) \[([^\]]+)\] "([^"]*)" (\d{3}) (\S+)$"#)?;
187 Ok(CompiledPattern {
188 regex,
189 field_names: vec![
190 "remote_host".into(),
191 "ident".into(),
192 "remote_user".into(),
193 "timestamp".into(),
194 "request".into(),
195 "status".into(),
196 "bytes".into(),
197 ],
198 })
199 }
200 LogFormat::Nginx => {
201 let regex = Regex::new(
203 r#"^(\S+) - (\S+) \[([^\]]+)\] "([^"]*)" (\d{3}) (\S+) "([^"]*)" "([^"]*)"$"#,
204 )?;
205 Ok(CompiledPattern {
206 regex,
207 field_names: vec![
208 "remote_addr".into(),
209 "remote_user".into(),
210 "time_local".into(),
211 "request".into(),
212 "status".into(),
213 "body_bytes_sent".into(),
214 "http_referer".into(),
215 "http_user_agent".into(),
216 ],
217 })
218 }
219 LogFormat::Syslog => {
220 let regex = Regex::new(
225 r"^(?:<(\d+)>)?(\w{3}\s+\d{1,2}\s+\d{2}:\d{2}:\d{2})\s+(\S+)\s+(\S+?)(?:\[(\d+)\])?:\s+(.+)$",
226 )?;
227 Ok(CompiledPattern {
228 regex,
229 field_names: vec![
230 "priority".into(),
231 "timestamp".into(),
232 "hostname".into(),
233 "app_name".into(),
234 "pid".into(),
235 "message".into(),
236 ],
237 })
238 }
239 }
240}
241
242fn compile_custom_pattern(pattern: &str) -> anyhow::Result<CompiledPattern> {
249 let mut regex_str = String::from("^");
250 let mut field_names = Vec::new();
251 let mut chars = pattern.chars().peekable();
252
253 while let Some(&ch) = chars.peek() {
254 if ch == '{' {
255 chars.next(); let mut name = String::new();
257 loop {
258 match chars.next() {
259 Some('}') => break,
260 Some(c) => name.push(c),
261 None => anyhow::bail!(
262 "Unclosed '{{' in log format pattern. Expected '}}' to close field '{name}'"
263 ),
264 }
265 }
266 if name.is_empty() {
267 anyhow::bail!("Empty field name '{{}}' in log format pattern");
268 }
269 field_names.push(name);
270
271 match chars.peek() {
273 None => {
274 regex_str.push_str("(.+)");
276 }
277 Some(&next_ch) => {
278 if next_ch == '[' || next_ch == '"' {
279 regex_str.push_str("([^");
281 regex_str.push_str(®ex::escape(&next_ch.to_string()));
282 regex_str.push_str("]*)");
283 } else if next_ch == ' ' {
284 regex_str.push_str(r"(\S+)");
286 } else {
287 regex_str.push_str("([^");
289 regex_str.push_str(®ex::escape(&next_ch.to_string()));
290 regex_str.push_str("]+)");
291 }
292 }
293 }
294 } else {
295 chars.next();
296 regex_str.push_str(®ex::escape(&ch.to_string()));
297 }
298 }
299 regex_str.push('$');
300
301 let regex = Regex::new(®ex_str).map_err(|e| {
302 anyhow::anyhow!(
303 "Failed to compile log format pattern into regex: {e}\n Pattern: {pattern}\n Generated regex: {regex_str}"
304 )
305 })?;
306
307 Ok(CompiledPattern { regex, field_names })
308}
309
310#[cfg(test)]
311mod tests {
312 use super::*;
313
314 fn default_opts() -> LogReaderOptions {
315 LogReaderOptions::default()
316 }
317
318 #[test]
321 fn test_apache_combined_basic() {
322 let reader = LogReader::new("apache-combined", default_opts()).unwrap();
323 let input = r#"127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] "GET /apache_pb.gif HTTP/1.0" 200 2326 "http://www.example.com/start.html" "Mozilla/4.08""#;
324 let result = reader.read(input).unwrap();
325 let arr = result.as_array().unwrap();
326 assert_eq!(arr.len(), 1);
327 let obj = arr[0].as_object().unwrap();
328 assert_eq!(
329 obj.get("remote_host"),
330 Some(&Value::String("127.0.0.1".to_string()))
331 );
332 assert_eq!(
333 obj.get("remote_user"),
334 Some(&Value::String("frank".to_string()))
335 );
336 assert_eq!(obj.get("status"), Some(&Value::Integer(200)));
337 assert_eq!(obj.get("bytes"), Some(&Value::Integer(2326)));
338 assert_eq!(
339 obj.get("user_agent"),
340 Some(&Value::String("Mozilla/4.08".to_string()))
341 );
342 }
343
344 #[test]
345 fn test_apache_combined_alias() {
346 let reader = LogReader::new("apache", default_opts()).unwrap();
348 let input = r#"10.0.0.1 - - [01/Jan/2024:00:00:00 +0000] "POST /api HTTP/1.1" 201 512 "-" "curl/7.68""#;
349 let result = reader.read(input).unwrap();
350 let arr = result.as_array().unwrap();
351 assert_eq!(arr.len(), 1);
352 let obj = arr[0].as_object().unwrap();
353 assert_eq!(obj.get("ident"), Some(&Value::Null)); assert_eq!(obj.get("remote_user"), Some(&Value::Null));
355 assert_eq!(obj.get("status"), Some(&Value::Integer(201)));
356 }
357
358 #[test]
361 fn test_apache_common() {
362 let reader = LogReader::new("apache-common", default_opts()).unwrap();
363 let input = r#"192.168.1.1 - admin [15/Mar/2024:10:30:00 +0900] "GET /index.html HTTP/1.1" 200 1024"#;
364 let result = reader.read(input).unwrap();
365 let arr = result.as_array().unwrap();
366 assert_eq!(arr.len(), 1);
367 let obj = arr[0].as_object().unwrap();
368 assert_eq!(
369 obj.get("remote_host"),
370 Some(&Value::String("192.168.1.1".to_string()))
371 );
372 assert_eq!(
373 obj.get("request"),
374 Some(&Value::String("GET /index.html HTTP/1.1".to_string()))
375 );
376 assert_eq!(obj.get("status"), Some(&Value::Integer(200)));
377 assert_eq!(obj.get("bytes"), Some(&Value::Integer(1024)));
378 }
379
380 #[test]
383 fn test_nginx() {
384 let reader = LogReader::new("nginx", default_opts()).unwrap();
385 let input = r#"10.0.0.5 - alice [20/Feb/2024:08:15:00 +0000] "GET /api/users HTTP/2.0" 200 4096 "https://example.com" "Mozilla/5.0""#;
386 let result = reader.read(input).unwrap();
387 let arr = result.as_array().unwrap();
388 assert_eq!(arr.len(), 1);
389 let obj = arr[0].as_object().unwrap();
390 assert_eq!(
391 obj.get("remote_addr"),
392 Some(&Value::String("10.0.0.5".to_string()))
393 );
394 assert_eq!(
395 obj.get("remote_user"),
396 Some(&Value::String("alice".to_string()))
397 );
398 assert_eq!(obj.get("status"), Some(&Value::Integer(200)));
399 }
400
401 #[test]
404 fn test_syslog_with_pid() {
405 let reader = LogReader::new("syslog", default_opts()).unwrap();
406 let input = "Mar 10 13:55:36 myhost sshd[1234]: Accepted publickey for user from 10.0.0.1";
407 let result = reader.read(input).unwrap();
408 let arr = result.as_array().unwrap();
409 assert_eq!(arr.len(), 1);
410 let obj = arr[0].as_object().unwrap();
411 assert_eq!(
412 obj.get("timestamp"),
413 Some(&Value::String("Mar 10 13:55:36".to_string()))
414 );
415 assert_eq!(
416 obj.get("hostname"),
417 Some(&Value::String("myhost".to_string()))
418 );
419 assert_eq!(
420 obj.get("app_name"),
421 Some(&Value::String("sshd".to_string()))
422 );
423 assert_eq!(obj.get("pid"), Some(&Value::Integer(1234)));
424 assert_eq!(
425 obj.get("message"),
426 Some(&Value::String(
427 "Accepted publickey for user from 10.0.0.1".to_string()
428 ))
429 );
430 }
431
432 #[test]
433 fn test_syslog_with_priority() {
434 let reader = LogReader::new("syslog", default_opts()).unwrap();
435 let input = "<34>Mar 5 09:00:00 server01 cron[456]: job completed";
436 let result = reader.read(input).unwrap();
437 let arr = result.as_array().unwrap();
438 assert_eq!(arr.len(), 1);
439 let obj = arr[0].as_object().unwrap();
440 assert_eq!(obj.get("priority"), Some(&Value::Integer(34)));
441 assert_eq!(
442 obj.get("hostname"),
443 Some(&Value::String("server01".to_string()))
444 );
445 }
446
447 #[test]
450 fn test_custom_pattern_basic() {
451 let reader = LogReader::new("{timestamp} [{level}] {message}", default_opts()).unwrap();
452 let input = "2024-01-15T10:30:00 [INFO] Server started successfully";
453 let result = reader.read(input).unwrap();
454 let arr = result.as_array().unwrap();
455 assert_eq!(arr.len(), 1);
456 let obj = arr[0].as_object().unwrap();
457 assert_eq!(
458 obj.get("timestamp"),
459 Some(&Value::String("2024-01-15T10:30:00".to_string()))
460 );
461 assert_eq!(obj.get("level"), Some(&Value::String("INFO".to_string())));
462 assert_eq!(
463 obj.get("message"),
464 Some(&Value::String("Server started successfully".to_string()))
465 );
466 }
467
468 #[test]
469 fn test_custom_pattern_with_delimiters() {
470 let reader = LogReader::new("{ip} - {user} [{time}] {msg}", default_opts()).unwrap();
471 let input = "10.0.0.1 - admin [2024-01-01 00:00:00] request processed";
472 let result = reader.read(input).unwrap();
473 let arr = result.as_array().unwrap();
474 assert_eq!(arr.len(), 1);
475 let obj = arr[0].as_object().unwrap();
476 assert_eq!(obj.get("ip"), Some(&Value::String("10.0.0.1".to_string())));
477 assert_eq!(obj.get("user"), Some(&Value::String("admin".to_string())));
478 assert_eq!(
479 obj.get("time"),
480 Some(&Value::String("2024-01-01 00:00:00".to_string()))
481 );
482 }
483
484 #[test]
487 fn test_skip_unparseable_lines() {
488 let reader = LogReader::new("apache-common", default_opts()).unwrap();
489 let input = r#"192.168.1.1 - - [15/Mar/2024:10:30:00 +0900] "GET / HTTP/1.1" 200 512
490this is not a valid log line
49110.0.0.2 - - [15/Mar/2024:10:31:00 +0900] "POST /api HTTP/1.1" 201 256"#;
492 let result = reader.read(input).unwrap();
493 let arr = result.as_array().unwrap();
494 assert_eq!(arr.len(), 2); }
496
497 #[test]
498 fn test_raw_mode_for_unparseable_lines() {
499 let opts = LogReaderOptions {
500 on_error: LogParseErrorMode::Raw,
501 };
502 let reader = LogReader::new("apache-common", opts).unwrap();
503 let input = r#"192.168.1.1 - - [15/Mar/2024:10:30:00 +0900] "GET / HTTP/1.1" 200 512
504this is not a valid log line"#;
505 let result = reader.read(input).unwrap();
506 let arr = result.as_array().unwrap();
507 assert_eq!(arr.len(), 2);
508 let raw_obj = arr[1].as_object().unwrap();
509 assert_eq!(
510 raw_obj.get("_raw"),
511 Some(&Value::String("this is not a valid log line".to_string()))
512 );
513 }
514
515 #[test]
516 fn test_empty_input() {
517 let reader = LogReader::new("apache", default_opts()).unwrap();
518 let result = reader.read("").unwrap();
519 let arr = result.as_array().unwrap();
520 assert!(arr.is_empty());
521 }
522
523 #[test]
524 fn test_blank_lines_skipped() {
525 let reader = LogReader::new("syslog", default_opts()).unwrap();
526 let input = "\n\n \n";
527 let result = reader.read(input).unwrap();
528 let arr = result.as_array().unwrap();
529 assert!(arr.is_empty());
530 }
531
532 #[test]
535 fn test_multiple_apache_lines() {
536 let reader = LogReader::new("apache", default_opts()).unwrap();
537 let input = r#"10.0.0.1 - - [01/Jan/2024:00:00:00 +0000] "GET / HTTP/1.1" 200 1024 "-" "curl/7.68"
53810.0.0.2 - user [01/Jan/2024:00:01:00 +0000] "POST /login HTTP/1.1" 302 0 "http://example.com" "Mozilla/5.0"
53910.0.0.3 - - [01/Jan/2024:00:02:00 +0000] "GET /favicon.ico HTTP/1.1" 404 0 "-" "Mozilla/5.0""#;
540 let result = reader.read(input).unwrap();
541 let arr = result.as_array().unwrap();
542 assert_eq!(arr.len(), 3);
543 assert_eq!(
544 arr[2].as_object().unwrap().get("status"),
545 Some(&Value::Integer(404))
546 );
547 }
548
549 #[test]
552 fn test_read_from_reader() {
553 let reader = LogReader::new("apache-common", default_opts()).unwrap();
554 let input = br#"192.168.1.1 - - [15/Mar/2024:10:30:00 +0900] "GET / HTTP/1.1" 200 512"#;
555 let result = reader.read_from_reader(&input[..]).unwrap();
556 let arr = result.as_array().unwrap();
557 assert_eq!(arr.len(), 1);
558 }
559
560 #[test]
563 fn test_unclosed_brace_error() {
564 let result = LogReader::new("{timestamp [{level} {message", default_opts());
565 assert!(result.is_err());
566 }
567
568 #[test]
569 fn test_empty_field_name_error() {
570 let result = LogReader::new("{} some text", default_opts());
571 assert!(result.is_err());
572 }
573}