drainrs/lib.rs
1#![feature(iter_intersperse)]
2#![feature(hash_raw_entry)]
3//! drainrs implements the [Drain](https://jiemingzhu.github.io/pub/pjhe_icws2017.pdf) algorithm for automatic log parsing.
4//! # Example:
5//! ```bash
6//! cargo run ./apache-short.log | tail
7//! {"template":"[Sat Jun <*> <*> <*> [error] [client <*> script not found or unable to stat: /var/www/cgi-bin/awstats",
8//! "values":["11","03:03:04","2005]","202.133.98.6]"]}
9//! {"template":"[Sat Jun <*> <*> <*> [error] [client <*> script not found or unable to stat: /var/www/cgi-bin/awstats",
10//! "values":["11","03:03:04","2005]","202.133.98.6]"]}
11//! {"template":"[Sat Jun <*> <*> <*> [error] [client <*> script not found or unable to stat: /var/www/cgi-bin/awstats",
12//! "values":["11","03:03:04","2005]","202.133.98.6]"]}
13//! {"template":"[Sat Jun <*> <*> <*> [error] <*> Can't find child <*> in scoreboard",
14//! "values":["11","03:03:04","2005]","jk2_init()","4210"]}
15//! {"template":"[Sat Jun <*> <*> <*> [notice] workerEnv.init() ok <*>",
16//! "values":["11","03:03:04","2005]","/etc/httpd/conf/workers2.properties"]}
17//! {"template":"[Sat Jun <*> <*> <*> [error] mod_jk child init <*> <*>",
18//! "values":["11","03:03:04","2005]","1","-2"]}
19//! {"template":"[Sat Jun <*> <*> <*> [error] [client <*> script not found or unable to stat: /var/www/cgi-bin/awstats",
20//! "values":["11","03:03:04","2005]","202.133.98.6]"]}
21//! ```
22//!
23//!
24//! # Vocabulary
25//! A **log record** is an entry in a text file, typically one-line but doesn't have to be.
26//!
27//! E.g. `[Thu Jun 09 06:07:05 2005] [notice] Digest logline here: done`
28//!
29//!
30//! A **log template** is the string template that was used to format log record.
31//!
32//! For example in Python format, that would look like this:
33//!
34//! `"[{date}] [{log_level}] Digest logline here: {status}".format(...)`
35//!
36//! Or in the syntax output by drain.py and drain3.py:
37//!
38//! `"[<date>] [<log_level>] Digest logline here: <*>"`
39//!
40//! # TODO
41//! * None of the parameters that are configurable in the Python version are yet configurable here.
42//! * The first drain allowed `split_line_provided`, which let you write a simple token-mapper like this:
43//!
44//! `<timestamp> <loglevel> <content>`
45//!
46//! And then drain would only apply its logic to `<content>`.
47//!
48//! Drain3 appears to have dropped this in favor of preprocessing on the user-code side, which is fair enough, although
49//! the feature is very helpful from a cli/no-coding perspective.
50//!
51//! * Drain3 allows "masking", which appears to be for recognizing values like IPs or numbers.
52//! We have preliminary support for masking but it's not configurable from outside of the class and the user interface
53//! to it is not yet defined.
54
55use std::fmt;
56
57use json_in_type::list::ToJSONList;
58use json_in_type::*;
59use log::{debug, error};
60use rustc_hash::FxHashMap;
61use std::iter::zip;
62use thiserror::Error;
63
64/// In the process of parsing, the drain algo populates a ParseTree. This tree could be saved
65/// and re-used on the next run, to avoid "forgetting" the previously recognized log templates.
66#[derive(Default)]
67pub struct ParseTree {
68 root: TreeRoot,
69 next_cluster_id: usize,
70}
71
72fn zip_tokens_and_template<'c>(
73 templatetokens: &[LogTemplateItem],
74 logtokens: &[TokenParse<'c>],
75 results: &mut Vec<&'c str>,
76) {
77 results.clear();
78 for (template_token, log_token) in zip(templatetokens, logtokens) {
79 match template_token {
80 LogTemplateItem::StaticToken(_) => {}
81 LogTemplateItem::Value => match log_token {
82 TokenParse::Token(v) => results.push(*v),
83 TokenParse::MaskedValue(v) => results.push(*v),
84 },
85 }
86 }
87}
88
89/// The elements in a LogTemplate (not a record).
90/// Given a log-template (in string form) like this,
91///
92/// `"[{date}] [{log_level}] Digest logline here: {status}"`
93///
94/// the parsed rich-type form would be:
95///
96/// `[Value, Value, StaticToken("Digest"), StaticToken("logline"), StaticToken("here:"), Value]`
97#[derive(Debug, PartialEq, Eq, Hash, Clone)]
98pub enum LogTemplateItem {
99 StaticToken(String), // Owned because we need to store it.
100 Value, // Python port used "<*>" instead.
101}
102
103impl fmt::Display for LogTemplateItem {
104 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105 write!(
106 f,
107 "{}",
108 match self {
109 Self::StaticToken(s) => s,
110 Self::Value => "<*>",
111 }
112 )
113 }
114}
115
116#[derive(Debug)]
117enum TokenParse<'a> {
118 Token(&'a str),
119 MaskedValue(&'a str),
120}
121#[derive(Debug)]
122enum Preprocessed<'a> {
123 Segment(&'a str),
124 Value(&'a str),
125}
126
127#[derive(Error, Debug)]
128pub enum ParseError {
129 #[error("couldn't parse line with user defined template, multiline log msg?")]
130 NoTokensInRecord,
131}
132
133/// For each log record, contains template_id the record belongs to, and `values` used to create the record.
134#[derive(Debug)]
135pub struct RecordParsed<'a> {
136 /// Maps 1:1 to the order of NewTemplates recognized.
137 /// Use this to map to any state you stored when recvd NewTemplate.
138 pub template_id: usize,
139 /// The values used to populate the record from the template. Given the following template:
140 ///
141 /// `[Value, Value, StaticToken("Digest"), StaticToken("logline"), StaticToken("here:"), Value]`
142 /// values for a particular record might be:
143 ///
144 /// E.g. ["Thu Jun 09 06:07:05 2005", "notice", "done"]
145 pub values: Vec<&'a str>,
146 // Can't get this to compile. Doesn't seem to be a big deal perf-wise.
147 // pub values: &'short[&'a str],
148}
149
150/// When a new log-template is discovered, drainrs will return this item.
151/// Don't forget to refer to first_parse.
152#[derive(Debug)]
153pub struct NewTemplate<'a> {
154 pub template: LogTemplate,
155 pub first_parse: RecordParsed<'a>,
156}
157
158/// See doc of each item.
159#[derive(Debug)]
160pub enum RecordsParsedResult<'a> {
161 NewTemplate(NewTemplate<'a>),
162 RecordParsed(RecordParsed<'a>),
163 ParseError(ParseError),
164}
165
166/// Iterator yielding every log record in the input string. A log record is generally a log-line,
167/// but can be multi-line.
168pub struct RecordsParsedIter<'a, 'b: 'a> {
169 pub input: &'a str,
170 pub state: &'b mut ParseTree,
171 tokens: Vec<TokenParse<'a>>,
172 parsed: &'a mut Vec<&'a str>,
173}
174
175impl<'a, 'b> RecordsParsedIter<'a, 'b> {
176 pub fn from(
177 input: &'a str,
178 state: &'b mut ParseTree,
179 parsed_buffer: &'a mut Vec<&'a str>,
180 ) -> RecordsParsedIter<'a, 'b> {
181 RecordsParsedIter {
182 input,
183 state,
184 tokens: Vec::new(),
185 parsed: parsed_buffer,
186 }
187 }
188}
189
190impl<'a, 'b> Iterator for RecordsParsedIter<'a, 'b> {
191 type Item = RecordsParsedResult<'a>;
192
193 #[inline]
194 fn next(&mut self) -> Option<Self::Item> {
195 let split_result = self.input.split_once('\n');
196 let (line, next_input) = match split_result {
197 Some((line, rest)) => (line.strip_suffix('\r').unwrap_or(line), rest),
198 None => (self.input, &self.input[0..0]),
199 };
200 self.input = next_input;
201 if line.is_empty() {
202 return None;
203 }
204 // TODO we should be able to handle multi-line logs, but the original paper doesn't.
205 // It is easily fixed in the python by looking back, not so simple here.
206 // We will probably have to iterate through lines looking ahead.
207
208 // Step 1. First we split the line to get all of the tokens.
209 // add_log_message from drain3.py
210 // E.g. splits the line into chunks <timestamp> <loglevel> <content>
211 let line_chunks = split_line_provided(line); // Pre-defined chunks as user-specified, like <Time> <Content>
212 if line_chunks.is_none() {
213 // Couldn't parse with the given regex, it's probably a multiline string. Attach it to the last-emitted log.
214 // TODO attach_to_last_line(line);
215 return Some(RecordsParsedResult::ParseError(
216 ParseError::NoTokensInRecord,
217 ));
218 }
219 // TODO Let Content be something not the last thing in the msg, like for trailing log-line tags.
220 let line_chunks = line_chunks.unwrap();
221 let log_content = *line_chunks.iter().rev().next().unwrap();
222
223 // This is the masking feature from drain3; not clear what scenarios
224 // would be best to use it, but running without it for now.
225 let mut preprocessed = Vec::new();
226 if false {
227 let re = Regex::new(r"\d+").unwrap();
228 let mut last_index = 0;
229 for mmatch in re.find_iter(log_content) {
230 if mmatch.start() > last_index {
231 preprocessed.push(Preprocessed::Segment(
232 &log_content[last_index..mmatch.start()],
233 ));
234 }
235 preprocessed.push(Preprocessed::Value(mmatch.as_str()));
236 last_index = mmatch.end();
237 }
238 if last_index != log_content.len() {
239 preprocessed.push(Preprocessed::Segment(&log_content[last_index..]));
240 }
241 } else {
242 preprocessed.push(Preprocessed::Segment(log_content));
243 }
244
245 let tokens = &mut self.tokens;
246 tokens.clear();
247 debug!("preprocessed={:?}", preprocessed);
248 for elem in preprocessed {
249 match elem {
250 Preprocessed::Segment(s) => tokens.extend(
251 s.split([' ', '\t'])
252 .filter(|s| !s.is_empty())
253 .map(TokenParse::Token),
254 ),
255 Preprocessed::Value(v) => tokens.push(TokenParse::MaskedValue(v)),
256 }
257 }
258
259 if tokens.is_empty() {
260 return Some(RecordsParsedResult::ParseError(
261 ParseError::NoTokensInRecord,
262 ));
263 }
264
265 // Step 2, we map #(num_tokens) => a parse tree with limited depth.
266 let match_cluster = tree_search(&self.state.root, tokens);
267
268 if match_cluster.is_none() {
269 // We could also inline add_seq_to_prefix_tree here,
270 // Either the prefix tree did not exist, in which case we have to add it and a one-cluster leaf-node.
271 // Or, the prefix tree did exist, but no cluster matched above the threshold, so we need to add a cluster there.
272 let match_cluster = Some(add_seq_to_prefix_tree(
273 &mut self.state.root,
274 &tokens,
275 &mut self.state.next_cluster_id,
276 ))
277 .unwrap();
278 self.parsed.clear();
279 zip_tokens_and_template(&match_cluster.template, &tokens, self.parsed);
280 return Some(Self::Item::NewTemplate(NewTemplate {
281 // We can't return this because it would imply that our mutable self borrow in Self::next outlives 'a.
282 // We could make this less-copy by using a streaming-iterator or just taking callbacks to call.
283 // Unclear which would be more idiomatic, so leaving it alone for now.
284 template: match_cluster.template.to_vec(),
285 first_parse: RecordParsed {
286 values: self.parsed.to_vec(),
287 template_id: match_cluster.cluster_id,
288 },
289 }));
290 }
291 let match_cluster = match_cluster.unwrap();
292 debug!("Line {} matched cluster: {:?}", line, match_cluster);
293 // It feels like it should be doable to pass tokens without collecting it first,
294 // maintaining its lifetime as pointing to the original record. but skipped for now
295 // since can't figure out how to do that without .collect().
296 self.parsed.clear();
297 zip_tokens_and_template(&match_cluster.template, &tokens, self.parsed);
298 return Some(Self::Item::RecordParsed(RecordParsed {
299 values: self.parsed.to_vec(),
300 template_id: match_cluster.cluster_id,
301 }));
302 }
303}
304
305fn has_numbers(s: &str) -> bool {
306 s.chars().any(char::is_numeric)
307}
308
309fn split_line_provided(_line: &str) -> Option<Vec<&str>> {
310 // TODO Copy regex from python, return list of spanning indicators.
311 Some(vec![_line])
312}
313
314#[derive(Debug)]
315struct LogCluster {
316 template: LogTemplate,
317 cluster_id: usize,
318 // size: i64 from Python == num_matches. Why track this? Seems to be only for debugging.
319}
320
321fn sequence_distance(seq1: &[LogTemplateItem], seq2: &[TokenParse]) -> (f64, i64) {
322 assert!(seq1.len() == seq2.len());
323 if seq1.is_empty() {
324 return (1.0, 0);
325 }
326 let mut sim_tokens: i64 = 0;
327 let mut num_of_par = 0;
328 for (token1, token2) in seq1.iter().zip(seq2.iter()) {
329 match token1 {
330 LogTemplateItem::Value => num_of_par += 1,
331 LogTemplateItem::StaticToken(token1) => match token2 {
332 TokenParse::Token(token2) => {
333 if token1 == token2 {
334 sim_tokens += 1
335 }
336 }
337 TokenParse::MaskedValue(_) => num_of_par += 1,
338 },
339 }
340 }
341 // Params don't match because this way we just skip params, and find the actually most-similar one, rather than letting
342 // templates like "* * * " dominate.
343
344 // let retVal = f64::from(simTokens) / f64::from(seq1.len());
345 let ret_val = sim_tokens as f64 / seq1.len() as f64;
346 (ret_val, num_of_par)
347}
348
349// const SIMILARITY_THRESHOLD: f64 = 0.7;
350const SIMILARITY_THRESHOLD: f64 = 0.4;
351fn fast_match<'a>(logclusts: &'a Vec<LogCluster>, tokens: &[TokenParse]) -> Option<&'a LogCluster> {
352 // Sequence similarity search.
353 let mut max_similarity = -1.0;
354 let mut max_param_count = -1;
355 let mut max_cluster = None;
356 // The rewritten Python version introduced an 'include wildcard in matching' flag that I don't see any reason to set to False.
357 // so, omitted here, back to original algo from paper.
358
359 for log_clust in logclusts {
360 let (cur_similarity, cur_num_params) = sequence_distance(&log_clust.template, tokens);
361 if cur_similarity > max_similarity
362 || (cur_similarity == max_similarity && cur_num_params > max_param_count)
363 {
364 max_similarity = cur_similarity;
365 max_param_count = cur_num_params;
366 max_cluster = Some(log_clust);
367 }
368 }
369
370 if max_similarity >= SIMILARITY_THRESHOLD {
371 max_cluster
372 } else {
373 None
374 }
375}
376
377const MAX_DEPTH: usize = 4;
378const MAX_CHILDREN: usize = 100;
379fn add_seq_to_prefix_tree<'a>(
380 root: &'a mut TreeRoot,
381 tokens: &Vec<TokenParse>,
382 num_clusters: &mut usize,
383) -> &'a LogCluster {
384 // Make sure there is a num_token => middle_node element.
385 let clust_id = *num_clusters;
386 *num_clusters += 1;
387 debug!("Adding seq {} to tree: {:?}", clust_id, tokens);
388 let token_count = tokens.len();
389 assert!(token_count >= 2);
390 let mut cur_node = root.entry(token_count).or_insert_with(|| {
391 GraphNodeContents::MiddleNode(MiddleNode {
392 child_d: FxHashMap::default(),
393 })
394 });
395
396 let mut current_depth = 1;
397 for token in tokens {
398 let inserter = || {
399 if current_depth == MAX_DEPTH - 1 || current_depth == token_count - 1 {
400 GraphNodeContents::LeafNode(Vec::new())
401 } else {
402 GraphNodeContents::MiddleNode(MiddleNode {
403 child_d: FxHashMap::default(),
404 })
405 }
406 };
407
408 // trace!("token: {:?} node {:?}", token, cur_node);
409 cur_node = match cur_node {
410 GraphNodeContents::MiddleNode(middle) => {
411 assert!(!(current_depth >= MAX_DEPTH || current_depth >= token_count));
412 // if token not matched in this layer of existing tree.
413 let num_children = middle.child_d.len();
414 match token {
415 TokenParse::MaskedValue(_v) => middle
416 .child_d
417 .entry(LogTemplateItem::Value)
418 .or_insert_with(inserter),
419 TokenParse::Token(token) => {
420 let perfect_match_key = LogTemplateItem::StaticToken(token.to_string());
421 let found_node = middle.child_d.contains_key(&perfect_match_key);
422
423 // Double-lookup pleases the borrow-checker :shrug:
424 if found_node {
425 middle.child_d.get_mut(&perfect_match_key).unwrap()
426 } else {
427 // At first glance, skipping over '*' entries here is unintuitive. However, if we've made it to
428 // adding, then there was not a satisfactory match in the tree already. So we'll copy the original
429 // algo and make a new node even if there is already a star here, as long as no numbers.
430 // if self.parametrize_numeric_tokens
431 // If it's a numerical token, take the * path.
432 if has_numbers(token) || num_children >= MAX_CHILDREN {
433 middle
434 .child_d
435 .entry(LogTemplateItem::Value)
436 .or_insert_with(inserter)
437 } else {
438 // It's not a numerical token, and there is room (maxChildren), add it.
439 middle
440 .child_d
441 .entry(perfect_match_key)
442 .or_insert_with(inserter)
443 }
444 }
445 }
446 }
447 }
448 GraphNodeContents::LeafNode(leaf) => {
449 // if at max depth or this is last token in template - add current log cluster to the leaf node
450 assert!(current_depth >= MAX_DEPTH || current_depth >= token_count);
451 leaf.push(LogCluster {
452 template: tokens
453 .iter()
454 .map(|tp| match tp {
455 TokenParse::Token(t) => match has_numbers(t) {
456 true => LogTemplateItem::Value,
457 false => LogTemplateItem::StaticToken(t.to_string()),
458 },
459 TokenParse::MaskedValue(_v) => LogTemplateItem::Value,
460 })
461 .collect(),
462 cluster_id: clust_id,
463 });
464 debug!("tree: {:?}", leaf);
465 return &leaf[leaf.len() - 1];
466 }
467 };
468 current_depth += 1
469 }
470 unreachable!();
471}
472
473// https://developer.ibm.com/blogs/how-mining-log-templates-can-help-ai-ops-in-cloud-scale-data-centers/
474
475fn tree_search<'a>(root: &'a TreeRoot, tokens: &[TokenParse]) -> Option<&'a LogCluster> {
476 let token_count = tokens.len();
477 assert!(token_count != 0);
478 let e = root.get(&token_count);
479 // No template with same token count yet.
480 e?;
481
482 let mut cur_node = e.unwrap();
483 /*if let GraphNodeContents::LeafNode(p) = parentn {
484 unreachable!("Shouldn't be possible.");
485 }*/
486 // let GraphNodeContents::MiddleNode(mut parentn) = parentn;
487 let mut current_depth = 1;
488 for token in tokens {
489 if current_depth >= MAX_DEPTH {
490 break;
491 }
492
493 let middle = match cur_node {
494 GraphNodeContents::MiddleNode(x) => x,
495 GraphNodeContents::LeafNode(_) => {
496 // Done, at leaf-node.
497 assert!(current_depth == token_count);
498 break;
499 }
500 };
501
502 // If we know it's a Value, go ahead and take that branch.
503 match token {
504 TokenParse::MaskedValue(_v) => {
505 let maybe_next = middle.child_d.get(&LogTemplateItem::Value);
506 if let Some(next) = maybe_next {
507 cur_node = next;
508 } else {
509 return None;
510 }
511 }
512 TokenParse::Token(token) => {
513 // Actually walking to next child, look for the token, or a wildcard, or fail.
514 let maybe_next = middle
515 .child_d
516 .get(&LogTemplateItem::StaticToken(token.to_string()));
517 if let Some(next) = maybe_next {
518 cur_node = next;
519 } else if let Some(wildcard) = middle.child_d.get(&LogTemplateItem::Value) {
520 cur_node = wildcard;
521 } else {
522 return None; // Tried going down prefix tree that did not exist, need to make a new entry.
523 }
524 }
525 }
526 current_depth += 1;
527 }
528 // We have arrived at a list of LogClusters in a leaf-node.
529 // Now, from these clusters, we need to pick the one that matches the closest.
530 let log_clust = match cur_node {
531 GraphNodeContents::MiddleNode(_) => unreachable!("Mistake."),
532 GraphNodeContents::LeafNode(x) => x,
533 };
534 let ret_log_clust = fast_match(log_clust, tokens);
535 ret_log_clust
536}
537
538#[derive(Debug)]
539struct MiddleNode {
540 child_d: FxHashMap<LogTemplateItem, GraphNodeContents>,
541}
542
543#[derive(Debug)]
544enum GraphNodeContents {
545 MiddleNode(MiddleNode),
546 LeafNode(Vec<LogCluster>),
547}
548
549type TreeRoot = FxHashMap<usize, GraphNodeContents>;
550pub type LogTemplate = Vec<LogTemplateItem>;
551
552use regex::Regex;
553
554use std::fs::read_to_string;
555
556/// Barebones example usage of the crate. Reads whole file into memory.
557pub fn print_log(filename: &str, actually_print: bool) {
558 // Abstraction review is due here. We don't need the whole file,
559 // we support streaming. It seems a line-based iterator is probably best,
560 // as an alternative "chunks" leaves us having to deal with partial-line reads.
561 // But right now creating a RecordsParsedIter involves some allocation.
562 // So for now we'll stick with read_to_string.
563 // Probably solution is to move most of the fields of iter out to a ParserState
564 // Then user doesn't have to fiddle with making each piece individually here, either.
565 let s: _ = read_to_string(filename).unwrap();
566 let mut tree = ParseTree::default();
567 let mut template_names = Vec::new();
568 let handle_parse = |template_names: &[String], rp: &RecordParsed| {
569 let typ = &template_names[rp.template_id];
570 let obj = json_object! {
571 template: typ,
572 values: ToJSONList(rp.values.to_vec())};
573 if actually_print {
574 println!("{}", obj.to_json_string());
575 }
576 };
577
578 for record in RecordsParsedIter::from(&s, &mut tree, &mut Vec::new()) {
579
580 match record {
581 RecordsParsedResult::NewTemplate(template) => {
582 template_names.push(
583 template
584 .template
585 .iter()
586 .map(|t| t.to_string())
587 .intersperse(" ".to_string())
588 .collect::<String>(),
589 );
590
591 handle_parse(&template_names, &template.first_parse);
592 }
593 crate::RecordsParsedResult::RecordParsed(rp) => handle_parse(&template_names, &rp),
594 crate::RecordsParsedResult::ParseError(e) => error!("err: {}", e),
595 }
596 }
597}