Mem_Parser 0.1.0

Zero-copy log parser with mmap input, streaming lines, and optional bump arena AST
Documentation
//! Zero-copy semi-structured log parsing: streaming lines plus optional arena AST.
//!
//! # mmap safety
//!
//! See [`MappedFile`](source::MappedFile).

mod arena_tree;
mod error;
mod lexer;
mod source;
mod span;
mod stream;

pub use arena_tree::{ArenaParser, FieldNode, LineAst};
pub use error::ParseError;
pub use lexer::{
    fields_on_line, lines_in_str, push_fields_on_line, FieldRef, LineIter, LineView, LogDialect,
    SplitMode,
};
pub use source::{MappedFile, Source};
pub use span::Span;
pub use stream::{parse_log_stream, BorrowedRecord, LogStream};

#[cfg(test)]
mod tests {
    use std::io::Write;

    use bumpalo::Bump;
    use tempfile::NamedTempFile;

    use crate::arena_tree::FieldNode;
    use crate::lexer::{fields_on_line, LineView};
    use crate::{ArenaParser, LineIter, LogDialect, LogStream, MappedFile, ParseError, SplitMode};

    #[test]
    fn empty_input_yields_no_lines() {
        let mut it = LineIter::new(b"", None);
        assert!(it.next().is_none());
    }

    #[test]
    fn single_line_without_newline() {
        let buf = b"hello";
        let mut it = LineIter::new(buf, None).map(|x| x.unwrap());
        assert_eq!(
            it.next(),
            Some(LineView {
                text: "hello",
                byte_offset: 0
            })
        );
        assert!(it.next().is_none());
    }

    #[test]
    fn crlf_strips_carriage_return() {
        let buf = b"a\r\n";
        let mut it = LineIter::new(buf, None).map(|x| x.unwrap());
        assert_eq!(
            it.next(),
            Some(LineView {
                text: "a",
                byte_offset: 0
            })
        );
    }

    #[test]
    fn line_too_long_errors() {
        let buf = b"abcdef";
        let mut it = LineIter::new(buf, Some(3));
        assert_eq!(
            it.next(),
            Some(Err(ParseError::LineTooLong {
                line_start: 0,
                len: 6,
                max: 3
            }))
        );
    }

    #[test]
    fn invalid_utf8_surface_at_line() {
        let buf = [0xff, 0xfe, b'\n'];
        let mut it = LineIter::new(&buf, None);
        match it.next() {
            Some(Err(ParseError::InvalidUtf8 { byte })) => assert_eq!(byte, 0),
            other => panic!("expected InvalidUtf8, got {other:?}"),
        }
    }

    #[test]
    fn key_value_fields() {
        let lv = LineView {
            text: "a=1 b=2  token",
            byte_offset: 10,
        };
        let fields = fields_on_line(lv, LogDialect::default());
        assert_eq!(fields[0].key, "a");
        assert_eq!(fields[0].value, "1");
        assert_eq!(fields[1].key, "b");
        assert_eq!(fields[2].key, "");
        assert_eq!(fields[2].value, "token");
        assert_eq!(lv.span().start, 10);
    }

    #[test]
    fn space_tokens_mode() {
        let lv = LineView {
            text: "one two",
            byte_offset: 0,
        };
        let dialect = LogDialect {
            split_mode: SplitMode::SpaceTokens,
            ..LogDialect::default()
        };
        let fields = fields_on_line(lv, dialect);
        assert_eq!(fields.len(), 2);
        assert_eq!(fields[0].value, "one");
        assert_eq!(fields[1].value, "two");
    }

    #[test]
    fn log_stream_collects_kv() {
        let dialect = LogDialect::default();
        let mut stream = LogStream::new(b"foo=bar x=y\n\r\nzoop=zap", dialect, None);
        let first = stream.next().unwrap().unwrap();
        assert_eq!(first.fields.len(), 2);
        let second = stream.next().unwrap().unwrap();
        assert!(second.line.text.is_empty());
        let third = stream.next().unwrap().unwrap();
        assert_eq!(third.fields[0].value, "zap");
    }

    #[test]
    fn arena_parse_line_slices_point_into_source() {
        let bump = Bump::new();
        let parser = ArenaParser::new(LogDialect::default());
        let line = LineView {
            text: "k=v",
            byte_offset: 0,
        };
        let ast = parser.parse_line_to_ast(&bump, line);
        assert_eq!(ast.fields.len(), 1);
        let FieldNode { key, value } = ast.fields[0];
        assert_eq!(key, "k");
        assert_eq!(value, "v");
        assert!(std::ptr::eq(line.text.as_ptr(), key.as_ptr()));
    }

    #[test]
    fn mmap_roundtrip_stream() -> std::io::Result<()> {
        let mut tmp = NamedTempFile::new()?;
        writeln!(tmp, "lvl=INFO msg=start")?;
        writeln!(tmp, "lvl=DEBUG msg=poll")?;

        let mapped = MappedFile::map_path(tmp.path())?;
        let dialect = LogDialect::default();
        let count = LogStream::new(mapped.as_bytes(), dialect, None).count();
        assert_eq!(count, 2);
        Ok(())
    }

    #[test]
    fn testdata_sample_log_mmap_stream() {
        let path = std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/sample.log");
        let mapped = MappedFile::map_path(&path).expect("mmap testdata/sample.log");
        let dialect = LogDialect::default();
        let records: Vec<_> = LogStream::new(mapped.as_bytes(), dialect, None)
            .map(|r| r.expect("line"))
            .collect();
        assert_eq!(records.len(), 7);
        let lvl = records[0]
            .fields
            .iter()
            .find(|f| f.key == "lvl")
            .expect("lvl field");
        assert_eq!(lvl.value, "INFO");
    }

    #[test]
    fn testdata_crlf_sample_mmap_stream() {
        let path =
            std::path::Path::new(env!("CARGO_MANIFEST_DIR")).join("testdata/crlf-sample.log");
        let mapped = MappedFile::map_path(&path).expect("mmap testdata/crlf-sample.log");
        let dialect = LogDialect::default();
        let records: Vec<_> = LogStream::new(mapped.as_bytes(), dialect, None)
            .map(|r| r.expect("line"))
            .collect();
        assert_eq!(records.len(), 2);
        assert_eq!(
            records[0].line.text,
            "lvl=START service=ingress region=us-east"
        );
    }
}