use memchr::memmem::Finder;
use memchr::{memchr, memrchr};
use memmap2::Mmap;
use simdutf8::basic::from_utf8 as simd_from_utf8;
use std::borrow::Cow;
use std::fs::File;
use std::path::Path;
use std::sync::LazyLock;
use crate::error::ParseError;
use crate::sqllog::Sqllog;
use encoding::all::GB18030;
use encoding::{DecoderTrap, Encoding};
static FINDER_CLOSE_META: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b") "));
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
pub(crate) enum FileEncodingHint {
#[default]
Auto,
Utf8,
Gb18030,
}
pub struct LogParser {
mmap: Mmap,
encoding: FileEncodingHint,
}
impl LogParser {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, ParseError> {
let file = File::open(path).map_err(|e| ParseError::IoError(e.to_string()))?;
let mmap = unsafe { Mmap::map(&file).map_err(|e| ParseError::IoError(e.to_string()))? };
let sample = &mmap[..mmap.len().min(65536)];
let encoding = if simd_from_utf8(sample).is_ok() {
FileEncodingHint::Utf8
} else {
FileEncodingHint::Gb18030
};
Ok(Self { mmap, encoding })
}
pub fn iter(&self) -> LogIterator<'_> {
LogIterator {
data: &self.mmap,
pos: 0,
encoding: self.encoding,
}
}
pub fn par_iter(
&self,
) -> impl rayon::iter::ParallelIterator<Item = Result<Sqllog<'_>, ParseError>> + '_ {
use rayon::prelude::*;
let data: &[u8] = &self.mmap;
let encoding = self.encoding;
let num_threads = rayon::current_num_threads().max(1);
let mut starts: Vec<usize> = vec![0];
if !data.is_empty() {
let chunk_size = (data.len() / num_threads).max(1);
for i in 1..num_threads {
let boundary = find_next_record_start(data, i * chunk_size);
if boundary < data.len() {
starts.push(boundary);
}
}
}
starts.push(data.len());
starts.dedup();
let bounds: Vec<(usize, usize)> = starts.windows(2).map(|w| (w[0], w[1])).collect();
bounds
.into_par_iter()
.flat_map_iter(move |(start, end)| LogIterator {
data: &data[start..end],
pos: 0,
encoding,
})
}
}
pub struct LogIterator<'a> {
data: &'a [u8],
pos: usize,
encoding: FileEncodingHint,
}
impl<'a> Iterator for LogIterator<'a> {
type Item = Result<Sqllog<'a>, ParseError>;
fn next(&mut self) -> Option<Self::Item> {
if self.pos >= self.data.len() {
return None;
}
let data = &self.data[self.pos..];
let mut scan_pos = 0;
let mut found_next = None;
let mut is_multiline = false;
while let Some(idx) = memchr(b'\n', &data[scan_pos..]) {
let newline_idx = scan_pos + idx;
let next_line_start = newline_idx + 1;
if next_line_start >= data.len() {
break;
}
let check_len = std::cmp::min(23, data.len() - next_line_start);
if check_len == 23 {
let next_bytes = &data[next_line_start..next_line_start + 23];
if next_bytes[0] == b'2'
&& next_bytes[1] == b'0'
&& next_bytes[4] == b'-'
&& next_bytes[7] == b'-'
&& next_bytes[10] == b' '
&& next_bytes[13] == b':'
&& next_bytes[16] == b':'
&& next_bytes[19] == b'.'
{
found_next = Some(newline_idx);
break;
}
}
is_multiline = true;
scan_pos = next_line_start;
}
let (record_end, next_start) = if let Some(idx) = found_next {
(idx, idx + 1)
} else {
(data.len(), data.len())
};
let record_slice = &data[..record_end];
self.pos += next_start;
let record_slice = if record_slice.ends_with(b"\r") {
&record_slice[..record_slice.len() - 1]
} else {
record_slice
};
if record_slice.is_empty() {
return self.next();
}
Some(parse_record_with_hint(
record_slice,
is_multiline,
self.encoding,
))
}
}
fn find_next_record_start(data: &[u8], from: usize) -> usize {
let mut pos = from;
if let Some(nl) = memchr(b'\n', &data[pos..]) {
pos += nl + 1;
} else {
return data.len();
}
loop {
if pos + 23 > data.len() {
return data.len();
}
let peek = &data[pos..pos + 23];
if peek[0] == b'2'
&& peek[1] == b'0'
&& peek[4] == b'-'
&& peek[7] == b'-'
&& peek[10] == b' '
&& peek[13] == b':'
&& peek[16] == b':'
&& peek[19] == b'.'
{
return pos;
}
match memchr(b'\n', &data[pos..]) {
Some(nl) => pos += nl + 1,
None => return data.len(),
}
}
}
pub fn parse_record<'a>(record_bytes: &'a [u8]) -> Result<Sqllog<'a>, ParseError> {
parse_record_with_hint(record_bytes, true, FileEncodingHint::Auto)
}
fn parse_record_with_hint<'a>(
record_bytes: &'a [u8],
is_multiline: bool,
encoding_hint: FileEncodingHint,
) -> Result<Sqllog<'a>, ParseError> {
let (first_line, _rest) = if is_multiline {
match memchr(b'\n', record_bytes) {
Some(idx) => {
let mut line = &record_bytes[..idx];
if line.ends_with(b"\r") {
line = &line[..line.len() - 1];
}
(line, &record_bytes[idx + 1..])
}
None => {
let mut line = record_bytes;
if line.ends_with(b"\r") {
line = &line[..line.len() - 1];
}
(line, &[] as &[u8])
}
}
} else {
let mut line = record_bytes;
if line.ends_with(b"\r") {
line = &line[..line.len() - 1];
}
(line, &[] as &[u8])
};
if first_line.len() < 23 {
return Err(ParseError::InvalidFormat {
raw: String::from_utf8_lossy(first_line).to_string(),
});
}
let ts = unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(&first_line[0..23])) };
let meta_start = match memchr(b'(', &first_line[23..]) {
Some(idx) => 23 + idx,
None => {
return Err(ParseError::InvalidFormat {
raw: String::from_utf8_lossy(first_line).to_string(),
});
}
};
let meta_end = match FINDER_CLOSE_META.find(&first_line[meta_start..]) {
Some(idx) => Some(meta_start + idx),
None => memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx),
};
let meta_end = match meta_end {
Some(idx) => idx,
None => {
return Err(ParseError::InvalidFormat {
raw: String::from_utf8_lossy(first_line).to_string(),
});
}
};
let meta_bytes = &first_line[meta_start + 1..meta_end];
let meta_raw = match encoding_hint {
FileEncodingHint::Utf8 => match simd_from_utf8(meta_bytes) {
Ok(s) => {
unsafe {
Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
s.as_ptr(),
s.len(),
)))
}
}
Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
},
FileEncodingHint::Gb18030 => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
Ok(s) => Cow::Owned(s),
Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
},
FileEncodingHint::Auto => match simd_from_utf8(meta_bytes) {
Ok(s) => {
unsafe {
Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
s.as_ptr(),
s.len(),
)))
}
}
Err(_) => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
Ok(s) => Cow::Owned(s),
Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
},
},
};
let body_start_in_first_line = meta_end + 1;
let content_start = if body_start_in_first_line < first_line.len()
&& first_line[body_start_in_first_line] == b' '
{
body_start_in_first_line + 1
} else {
body_start_in_first_line
};
let mut tag: Option<Cow<'a, str>> = None;
let content_slice = if content_start < record_bytes.len() {
let mut s = &record_bytes[content_start..];
if !s.is_empty()
&& s[0] == b'['
&& let Some(end_idx) = memchr(b']', s)
&& end_idx >= 1
{
let inner = &s[1..end_idx];
if !inner.contains(&b' ') && inner.len() <= 32 {
tag = match simd_from_utf8(inner) {
Ok(st) => Some(unsafe {
Cow::Borrowed(std::str::from_utf8_unchecked(std::slice::from_raw_parts(
st.as_ptr(),
st.len(),
)))
}),
Err(_) => match encoding_hint {
FileEncodingHint::Gb18030 => {
match GB18030.decode(inner, DecoderTrap::Strict) {
Ok(s) => Some(Cow::Owned(s)),
Err(_) => {
Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned()))
}
}
}
_ => Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned())),
},
};
s = &s[end_idx + 1..];
let mut skip = 0usize;
while skip < s.len() && s[skip].is_ascii_whitespace() {
skip += 1;
}
s = &s[skip..];
}
}
s
} else {
&[] as &[u8]
};
let content_raw = Cow::Borrowed(content_slice);
Ok(Sqllog {
ts,
meta_raw,
content_raw,
tag,
encoding: encoding_hint,
})
}