drainrs/
lib.rs

1#![feature(iter_intersperse)]
2#![feature(hash_raw_entry)]
3//! drainrs implements the [Drain](https://jiemingzhu.github.io/pub/pjhe_icws2017.pdf) algorithm for automatic log parsing.
4//! # Example:
5//! ```bash
6//!  cargo run ./apache-short.log | tail
7//! {"template":"[Sat Jun <*> <*> <*> [error] [client <*> script not found or unable to stat: /var/www/cgi-bin/awstats",
8//! "values":["11","03:03:04","2005]","202.133.98.6]"]}
9//! {"template":"[Sat Jun <*> <*> <*> [error] [client <*> script not found or unable to stat: /var/www/cgi-bin/awstats",
10//! "values":["11","03:03:04","2005]","202.133.98.6]"]}
11//! {"template":"[Sat Jun <*> <*> <*> [error] [client <*> script not found or unable to stat: /var/www/cgi-bin/awstats",
12//! "values":["11","03:03:04","2005]","202.133.98.6]"]}
13//! {"template":"[Sat Jun <*> <*> <*> [error] <*> Can't find child <*> in scoreboard",
14//! "values":["11","03:03:04","2005]","jk2_init()","4210"]}
15//! {"template":"[Sat Jun <*> <*> <*> [notice] workerEnv.init() ok <*>",
16//! "values":["11","03:03:04","2005]","/etc/httpd/conf/workers2.properties"]}
17//! {"template":"[Sat Jun <*> <*> <*> [error] mod_jk child init <*> <*>",
18//! "values":["11","03:03:04","2005]","1","-2"]}
19//! {"template":"[Sat Jun <*> <*> <*> [error] [client <*> script not found or unable to stat: /var/www/cgi-bin/awstats",
20//! "values":["11","03:03:04","2005]","202.133.98.6]"]}
21//! ```
22//!
23//!
24//! # Vocabulary
25//! A **log record** is an entry in a text file, typically one-line but doesn't have to be.
26//!
27//!   E.g. `[Thu Jun 09 06:07:05 2005] [notice] Digest logline here: done`
28//!
29//!
30//! A **log template** is the string template that was used to format log record.
31//!
32//! For example in Python format, that would look like this:
33//!
34//!   `"[{date}] [{log_level}] Digest logline here: {status}".format(...)`
35//!
36//! Or in the syntax output by drain.py and drain3.py:
37//!
38//!   `"[<date>] [<log_level>] Digest logline here: <*>"`
39//!
40//! # TODO
41//! * None of the parameters that are configurable in the Python version are yet configurable here.
42//! * The first drain allowed `split_line_provided`, which let you write a simple token-mapper like this:
43//!
44//!   `<timestamp> <loglevel> <content>`
45//!
46//! And then drain would only apply its logic to `<content>`.
47//!
48//! Drain3 appears to have dropped this in favor of preprocessing on the user-code side, which is fair enough, although
49//! the feature is very helpful from a cli/no-coding perspective.
50//!
51//! * Drain3 allows "masking", which appears to be for recognizing values like IPs or numbers.
52//! We have preliminary support for masking but it's not configurable from outside of the class and the user interface
53//! to it is not yet defined.
54
55use std::fmt;
56
57use json_in_type::list::ToJSONList;
58use json_in_type::*;
59use log::{debug, error};
60use rustc_hash::FxHashMap;
61use std::iter::zip;
62use thiserror::Error;
63
64/// In the process of parsing, the drain algo populates a ParseTree. This tree could be saved
65/// and re-used on the next run, to avoid "forgetting" the previously recognized log templates.
66#[derive(Default)]
67pub struct ParseTree {
68    root: TreeRoot,
69    next_cluster_id: usize,
70}
71
72fn zip_tokens_and_template<'c>(
73    templatetokens: &[LogTemplateItem],
74    logtokens: &[TokenParse<'c>],
75    results: &mut Vec<&'c str>,
76) {
77    results.clear();
78    for (template_token, log_token) in zip(templatetokens, logtokens) {
79        match template_token {
80            LogTemplateItem::StaticToken(_) => {}
81            LogTemplateItem::Value => match log_token {
82                TokenParse::Token(v) => results.push(*v),
83                TokenParse::MaskedValue(v) => results.push(*v),
84            },
85        }
86    }
87}
88
89/// The elements in a LogTemplate (not a record).
90/// Given a log-template (in string form) like this,
91///
92///   `"[{date}] [{log_level}] Digest logline here: {status}"`
93///
94///  the parsed rich-type form would be:
95///
96///   `[Value, Value, StaticToken("Digest"), StaticToken("logline"), StaticToken("here:"), Value]`
97#[derive(Debug, PartialEq, Eq, Hash, Clone)]
98pub enum LogTemplateItem {
99    StaticToken(String), // Owned because we need to store it.
100    Value,               // Python port used "<*>" instead.
101}
102
103impl fmt::Display for LogTemplateItem {
104    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
105        write!(
106            f,
107            "{}",
108            match self {
109                Self::StaticToken(s) => s,
110                Self::Value => "<*>",
111            }
112        )
113    }
114}
115
116#[derive(Debug)]
117enum TokenParse<'a> {
118    Token(&'a str),
119    MaskedValue(&'a str),
120}
121#[derive(Debug)]
122enum Preprocessed<'a> {
123    Segment(&'a str),
124    Value(&'a str),
125}
126
127#[derive(Error, Debug)]
128pub enum ParseError {
129    #[error("couldn't parse line with user defined template, multiline log msg?")]
130    NoTokensInRecord,
131}
132
133/// For each log record, contains template_id the record belongs to, and `values` used to create the record.
134#[derive(Debug)]
135pub struct RecordParsed<'a> {
136    /// Maps 1:1 to the order of NewTemplates recognized.
137    /// Use this to map to any state you stored when recvd NewTemplate.
138    pub template_id: usize,
139    /// The values used to populate the record from the template. Given the following template:
140    ///
141    ///   `[Value, Value, StaticToken("Digest"), StaticToken("logline"), StaticToken("here:"), Value]`
142    /// values for a particular record might be:
143    ///
144    /// E.g. ["Thu Jun 09 06:07:05 2005", "notice", "done"]
145    pub values: Vec<&'a str>,
146    // Can't get this to compile. Doesn't seem to be a big deal perf-wise.
147    // pub values: &'short[&'a str],
148}
149
150/// When a new log-template is discovered, drainrs will return this item.
151/// Don't forget to refer to first_parse.
152#[derive(Debug)]
153pub struct NewTemplate<'a> {
154    pub template: LogTemplate,
155    pub first_parse: RecordParsed<'a>,
156}
157
158/// See doc of each item.
159#[derive(Debug)]
160pub enum RecordsParsedResult<'a> {
161    NewTemplate(NewTemplate<'a>),
162    RecordParsed(RecordParsed<'a>),
163    ParseError(ParseError),
164}
165
166/// Iterator yielding every log record in the input string. A log record is generally a log-line,
167/// but can be multi-line.
168pub struct RecordsParsedIter<'a, 'b: 'a> {
169    pub input: &'a str,
170    pub state: &'b mut ParseTree,
171    tokens: Vec<TokenParse<'a>>,
172    parsed: &'a mut Vec<&'a str>,
173}
174
175impl<'a, 'b> RecordsParsedIter<'a, 'b> {
176    pub fn from(
177        input: &'a str,
178        state: &'b mut ParseTree,
179        parsed_buffer: &'a mut Vec<&'a str>,
180    ) -> RecordsParsedIter<'a, 'b> {
181        RecordsParsedIter {
182            input,
183            state,
184            tokens: Vec::new(),
185            parsed: parsed_buffer,
186        }
187    }
188}
189
190impl<'a, 'b> Iterator for RecordsParsedIter<'a, 'b> {
191    type Item = RecordsParsedResult<'a>;
192
193    #[inline]
194    fn next(&mut self) -> Option<Self::Item> {
195        let split_result = self.input.split_once('\n');
196        let (line, next_input) = match split_result {
197            Some((line, rest)) => (line.strip_suffix('\r').unwrap_or(line), rest),
198            None => (self.input, &self.input[0..0]),
199        };
200        self.input = next_input;
201        if line.is_empty() {
202            return None;
203        }
204        // TODO we should be able to handle multi-line logs, but the original paper doesn't.
205        // It is easily fixed in the python by looking back, not so simple here.
206        // We will probably have to iterate through lines looking ahead.
207
208        // Step 1. First we split the line to get all of the tokens.
209        // add_log_message from drain3.py
210        // E.g. splits the line into chunks <timestamp> <loglevel> <content>
211        let line_chunks = split_line_provided(line); // Pre-defined chunks as user-specified, like <Time> <Content>
212        if line_chunks.is_none() {
213            // Couldn't parse with the given regex, it's probably a multiline string. Attach it to the last-emitted log.
214            // TODO attach_to_last_line(line);
215            return Some(RecordsParsedResult::ParseError(
216                ParseError::NoTokensInRecord,
217            ));
218        }
219        // TODO Let Content be something not the last thing in the msg, like for trailing log-line tags.
220        let line_chunks = line_chunks.unwrap();
221        let log_content = *line_chunks.iter().rev().next().unwrap();
222
223        // This is the masking feature from drain3; not clear what scenarios
224        // would be best to use it, but running without it for now.
225        let mut preprocessed = Vec::new();
226        if false {
227            let re = Regex::new(r"\d+").unwrap();
228            let mut last_index = 0;
229            for mmatch in re.find_iter(log_content) {
230                if mmatch.start() > last_index {
231                    preprocessed.push(Preprocessed::Segment(
232                        &log_content[last_index..mmatch.start()],
233                    ));
234                }
235                preprocessed.push(Preprocessed::Value(mmatch.as_str()));
236                last_index = mmatch.end();
237            }
238            if last_index != log_content.len() {
239                preprocessed.push(Preprocessed::Segment(&log_content[last_index..]));
240            }
241        } else {
242            preprocessed.push(Preprocessed::Segment(log_content));
243        }
244
245        let tokens = &mut self.tokens;
246        tokens.clear();
247        debug!("preprocessed={:?}", preprocessed);
248        for elem in preprocessed {
249            match elem {
250                Preprocessed::Segment(s) => tokens.extend(
251                    s.split([' ', '\t'])
252                        .filter(|s| !s.is_empty())
253                        .map(TokenParse::Token),
254                ),
255                Preprocessed::Value(v) => tokens.push(TokenParse::MaskedValue(v)),
256            }
257        }
258
259        if tokens.is_empty() {
260            return Some(RecordsParsedResult::ParseError(
261                ParseError::NoTokensInRecord,
262            ));
263        }
264
265        // Step 2, we map #(num_tokens) => a parse tree with limited depth.
266        let match_cluster = tree_search(&self.state.root, tokens);
267
268        if match_cluster.is_none() {
269            // We could also inline add_seq_to_prefix_tree here,
270            // Either the prefix tree did not exist, in which case we have to add it and a one-cluster leaf-node.
271            // Or, the prefix tree did exist, but no cluster matched above the threshold, so we need to add a cluster there.
272            let match_cluster = Some(add_seq_to_prefix_tree(
273                &mut self.state.root,
274                &tokens,
275                &mut self.state.next_cluster_id,
276            ))
277            .unwrap();
278            self.parsed.clear();
279            zip_tokens_and_template(&match_cluster.template, &tokens, self.parsed);
280            return Some(Self::Item::NewTemplate(NewTemplate {
281                // We can't return this because it would imply that our mutable self borrow in Self::next outlives 'a.
282                // We could make this less-copy by using a streaming-iterator or just taking callbacks to call.
283                // Unclear which would be more idiomatic, so leaving it alone for now.
284                template: match_cluster.template.to_vec(),
285                first_parse: RecordParsed {
286                    values: self.parsed.to_vec(),
287                    template_id: match_cluster.cluster_id,
288                },
289            }));
290        }
291        let match_cluster = match_cluster.unwrap();
292        debug!("Line {} matched cluster: {:?}", line, match_cluster);
293        // It feels like it should be doable to pass tokens without collecting it first,
294        // maintaining its lifetime as pointing to the original record. but skipped for now
295        // since can't figure out how to do that without .collect().
296        self.parsed.clear();
297        zip_tokens_and_template(&match_cluster.template, &tokens, self.parsed);
298        return Some(Self::Item::RecordParsed(RecordParsed {
299            values: self.parsed.to_vec(),
300            template_id: match_cluster.cluster_id,
301        }));
302    }
303}
304
305fn has_numbers(s: &str) -> bool {
306    s.chars().any(char::is_numeric)
307}
308
309fn split_line_provided(_line: &str) -> Option<Vec<&str>> {
310    // TODO Copy regex from python, return list of spanning indicators.
311    Some(vec![_line])
312}
313
314#[derive(Debug)]
315struct LogCluster {
316    template: LogTemplate,
317    cluster_id: usize,
318    // size: i64 from Python == num_matches. Why track this? Seems to be only for debugging.
319}
320
321fn sequence_distance(seq1: &[LogTemplateItem], seq2: &[TokenParse]) -> (f64, i64) {
322    assert!(seq1.len() == seq2.len());
323    if seq1.is_empty() {
324        return (1.0, 0);
325    }
326    let mut sim_tokens: i64 = 0;
327    let mut num_of_par = 0;
328    for (token1, token2) in seq1.iter().zip(seq2.iter()) {
329        match token1 {
330            LogTemplateItem::Value => num_of_par += 1,
331            LogTemplateItem::StaticToken(token1) => match token2 {
332                TokenParse::Token(token2) => {
333                    if token1 == token2 {
334                        sim_tokens += 1
335                    }
336                }
337                TokenParse::MaskedValue(_) => num_of_par += 1,
338            },
339        }
340    }
341    // Params don't match because this way we just skip params, and find the actually most-similar one, rather than letting
342    // templates like "* * * " dominate.
343
344    // let retVal = f64::from(simTokens) / f64::from(seq1.len());
345    let ret_val = sim_tokens as f64 / seq1.len() as f64;
346    (ret_val, num_of_par)
347}
348
349// const SIMILARITY_THRESHOLD: f64 = 0.7;
350const SIMILARITY_THRESHOLD: f64 = 0.4;
351fn fast_match<'a>(logclusts: &'a Vec<LogCluster>, tokens: &[TokenParse]) -> Option<&'a LogCluster> {
352    // Sequence similarity search.
353    let mut max_similarity = -1.0;
354    let mut max_param_count = -1;
355    let mut max_cluster = None;
356    // The rewritten Python version introduced an 'include wildcard in matching' flag that I don't see any reason to set to False.
357    // so, omitted here, back to original algo from paper.
358
359    for log_clust in logclusts {
360        let (cur_similarity, cur_num_params) = sequence_distance(&log_clust.template, tokens);
361        if cur_similarity > max_similarity
362            || (cur_similarity == max_similarity && cur_num_params > max_param_count)
363        {
364            max_similarity = cur_similarity;
365            max_param_count = cur_num_params;
366            max_cluster = Some(log_clust);
367        }
368    }
369
370    if max_similarity >= SIMILARITY_THRESHOLD {
371        max_cluster
372    } else {
373        None
374    }
375}
376
377const MAX_DEPTH: usize = 4;
378const MAX_CHILDREN: usize = 100;
379fn add_seq_to_prefix_tree<'a>(
380    root: &'a mut TreeRoot,
381    tokens: &Vec<TokenParse>,
382    num_clusters: &mut usize,
383) -> &'a LogCluster {
384    // Make sure there is a num_token => middle_node element.
385    let clust_id = *num_clusters;
386    *num_clusters += 1;
387    debug!("Adding seq {} to tree: {:?}", clust_id, tokens);
388    let token_count = tokens.len();
389    assert!(token_count >= 2);
390    let mut cur_node = root.entry(token_count).or_insert_with(|| {
391        GraphNodeContents::MiddleNode(MiddleNode {
392            child_d: FxHashMap::default(),
393        })
394    });
395
396    let mut current_depth = 1;
397    for token in tokens {
398        let inserter = || {
399            if current_depth == MAX_DEPTH - 1 || current_depth == token_count - 1 {
400                GraphNodeContents::LeafNode(Vec::new())
401            } else {
402                GraphNodeContents::MiddleNode(MiddleNode {
403                    child_d: FxHashMap::default(),
404                })
405            }
406        };
407
408        // trace!("token: {:?} node {:?}", token, cur_node);
409        cur_node = match cur_node {
410            GraphNodeContents::MiddleNode(middle) => {
411                assert!(!(current_depth >= MAX_DEPTH || current_depth >= token_count));
412                // if token not matched in this layer of existing tree.
413                let num_children = middle.child_d.len();
414                match token {
415                    TokenParse::MaskedValue(_v) => middle
416                        .child_d
417                        .entry(LogTemplateItem::Value)
418                        .or_insert_with(inserter),
419                    TokenParse::Token(token) => {
420                        let perfect_match_key = LogTemplateItem::StaticToken(token.to_string());
421                        let found_node = middle.child_d.contains_key(&perfect_match_key);
422
423                        // Double-lookup pleases the borrow-checker :shrug:
424                        if found_node {
425                            middle.child_d.get_mut(&perfect_match_key).unwrap()
426                        } else {
427                            // At first glance, skipping over '*' entries here is unintuitive. However, if we've made it to
428                            // adding, then there was not a satisfactory match in the tree already. So we'll copy the original
429                            // algo and make a new node even if there is already a star here, as long as no numbers.
430                            // if self.parametrize_numeric_tokens
431                            // If it's a numerical token, take the * path.
432                            if has_numbers(token) || num_children >= MAX_CHILDREN {
433                                middle
434                                    .child_d
435                                    .entry(LogTemplateItem::Value)
436                                    .or_insert_with(inserter)
437                            } else {
438                                // It's not a numerical token, and there is room (maxChildren), add it.
439                                middle
440                                    .child_d
441                                    .entry(perfect_match_key)
442                                    .or_insert_with(inserter)
443                            }
444                        }
445                    }
446                }
447            }
448            GraphNodeContents::LeafNode(leaf) => {
449                // if at max depth or this is last token in template - add current log cluster to the leaf node
450                assert!(current_depth >= MAX_DEPTH || current_depth >= token_count);
451                leaf.push(LogCluster {
452                    template: tokens
453                        .iter()
454                        .map(|tp| match tp {
455                            TokenParse::Token(t) => match has_numbers(t) {
456                                true => LogTemplateItem::Value,
457                                false => LogTemplateItem::StaticToken(t.to_string()),
458                            },
459                            TokenParse::MaskedValue(_v) => LogTemplateItem::Value,
460                        })
461                        .collect(),
462                    cluster_id: clust_id,
463                });
464                debug!("tree: {:?}", leaf);
465                return &leaf[leaf.len() - 1];
466            }
467        };
468        current_depth += 1
469    }
470    unreachable!();
471}
472
473// https://developer.ibm.com/blogs/how-mining-log-templates-can-help-ai-ops-in-cloud-scale-data-centers/
474
475fn tree_search<'a>(root: &'a TreeRoot, tokens: &[TokenParse]) -> Option<&'a LogCluster> {
476    let token_count = tokens.len();
477    assert!(token_count != 0);
478    let e = root.get(&token_count);
479    // No template with same token count yet.
480    e?;
481
482    let mut cur_node = e.unwrap();
483    /*if let GraphNodeContents::LeafNode(p) = parentn {
484        unreachable!("Shouldn't be possible.");
485    }*/
486    // let GraphNodeContents::MiddleNode(mut parentn) = parentn;
487    let mut current_depth = 1;
488    for token in tokens {
489        if current_depth >= MAX_DEPTH {
490            break;
491        }
492
493        let middle = match cur_node {
494            GraphNodeContents::MiddleNode(x) => x,
495            GraphNodeContents::LeafNode(_) => {
496                // Done, at leaf-node.
497                assert!(current_depth == token_count);
498                break;
499            }
500        };
501
502        // If we know it's a Value, go ahead and take that branch.
503        match token {
504            TokenParse::MaskedValue(_v) => {
505                let maybe_next = middle.child_d.get(&LogTemplateItem::Value);
506                if let Some(next) = maybe_next {
507                    cur_node = next;
508                } else {
509                    return None;
510                }
511            }
512            TokenParse::Token(token) => {
513                // Actually walking to next child, look for the token, or a wildcard, or fail.
514                let maybe_next = middle
515                    .child_d
516                    .get(&LogTemplateItem::StaticToken(token.to_string()));
517                if let Some(next) = maybe_next {
518                    cur_node = next;
519                } else if let Some(wildcard) = middle.child_d.get(&LogTemplateItem::Value) {
520                    cur_node = wildcard;
521                } else {
522                    return None; // Tried going down prefix tree that did not exist, need to make a new entry.
523                }
524            }
525        }
526        current_depth += 1;
527    }
528    // We have arrived at a list of LogClusters in a leaf-node.
529    // Now, from these clusters, we need to pick the one that matches the closest.
530    let log_clust = match cur_node {
531        GraphNodeContents::MiddleNode(_) => unreachable!("Mistake."),
532        GraphNodeContents::LeafNode(x) => x,
533    };
534    let ret_log_clust = fast_match(log_clust, tokens);
535    ret_log_clust
536}
537
538#[derive(Debug)]
539struct MiddleNode {
540    child_d: FxHashMap<LogTemplateItem, GraphNodeContents>,
541}
542
543#[derive(Debug)]
544enum GraphNodeContents {
545    MiddleNode(MiddleNode),
546    LeafNode(Vec<LogCluster>),
547}
548
549type TreeRoot = FxHashMap<usize, GraphNodeContents>;
550pub type LogTemplate = Vec<LogTemplateItem>;
551
552use regex::Regex;
553
554use std::fs::read_to_string;
555
556/// Barebones example usage of the crate. Reads whole file into memory.
557pub fn print_log(filename: &str, actually_print: bool) {
558    // Abstraction review is due here. We don't need the whole file,
559    // we support streaming. It seems a line-based iterator is probably best,
560    // as an alternative "chunks" leaves us having to deal with partial-line reads.
561    // But right now creating a RecordsParsedIter involves some allocation.
562    // So for now we'll stick with read_to_string.
563    // Probably solution is to move most of the fields of iter out to a ParserState
564    // Then user doesn't have to fiddle with making each piece individually here, either.
565    let s: _ = read_to_string(filename).unwrap();
566    let mut tree = ParseTree::default();
567    let mut template_names = Vec::new();
568    let handle_parse = |template_names: &[String], rp: &RecordParsed| {
569        let typ = &template_names[rp.template_id];
570        let obj = json_object! {
571            template: typ,
572            values: ToJSONList(rp.values.to_vec())};
573        if actually_print {
574            println!("{}", obj.to_json_string());
575        }
576    };
577
578    for record in RecordsParsedIter::from(&s, &mut tree, &mut Vec::new()) {
579
580        match record {
581            RecordsParsedResult::NewTemplate(template) => {
582                template_names.push(
583                    template
584                        .template
585                        .iter()
586                        .map(|t| t.to_string())
587                        .intersperse(" ".to_string())
588                        .collect::<String>(),
589                );
590
591                handle_parse(&template_names, &template.first_parse);
592            }
593            crate::RecordsParsedResult::RecordParsed(rp) => handle_parse(&template_names, &rp),
594            crate::RecordsParsedResult::ParseError(e) => error!("err: {}", e),
595        }
596    }
597}