use memchr::memmem::Finder;
use memchr::{memchr, memrchr};
#[cfg(unix)]
use memmap2::Advice;
use memmap2::Mmap;
use simdutf8::basic::from_utf8 as simd_from_utf8;
use std::borrow::Cow;
use std::fs::File;
use std::path::Path;
use std::sync::LazyLock;
use crate::error::ParseError;
use crate::sqllog::Sqllog;
use encoding::all::GB18030;
use encoding::{DecoderTrap, Encoding};
static FINDER_CLOSE_META: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b") "));
static FINDER_RECORD_START: LazyLock<Finder<'static>> = LazyLock::new(|| Finder::new(b"\n20"));
#[derive(Copy, Clone, Debug, PartialEq, Eq, Default)]
pub(crate) enum FileEncodingHint {
#[default]
Auto,
Utf8,
Gb18030,
}
pub struct LogParser {
mmap: Mmap,
encoding: FileEncodingHint,
}
pub struct RecordIndex {
pub(crate) offsets: Vec<usize>,
}
impl RecordIndex {
pub fn len(&self) -> usize {
self.offsets.len()
}
pub fn is_empty(&self) -> bool {
self.offsets.is_empty()
}
}
impl LogParser {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, ParseError> {
let file = File::open(path).map_err(|e| ParseError::IoError(e.to_string()))?;
let mmap = unsafe { Mmap::map(&file).map_err(|e| ParseError::IoError(e.to_string()))? };
#[cfg(unix)]
let _ = mmap.advise(Advice::Sequential);
let head_size = mmap.len().min(64 * 1024);
let tail_start = mmap.len().saturating_sub(4 * 1024).max(head_size);
let head_ok = simd_from_utf8(&mmap[..head_size]).is_ok();
let tail_ok = tail_start >= mmap.len() || simd_from_utf8(&mmap[tail_start..]).is_ok();
let encoding = if head_ok && tail_ok {
FileEncodingHint::Utf8
} else {
FileEncodingHint::Gb18030
};
Ok(Self { mmap, encoding })
}
pub fn iter(&self) -> LogIterator<'_> {
LogIterator {
data: &self.mmap,
pos: 0,
encoding: self.encoding,
}
}
pub fn index(&self) -> RecordIndex {
let data: &[u8] = &self.mmap;
let mut offsets: Vec<usize> = Vec::new();
if data.len() >= 23 && is_timestamp_start(&data[0..23]) {
offsets.push(0);
}
let mut pos: usize = 0;
loop {
let next = find_next_record_start(data, pos);
if next >= data.len() {
break;
}
if offsets.last() != Some(&next) {
offsets.push(next);
}
pos = next.saturating_add(1);
}
RecordIndex { offsets }
}
pub fn par_iter(
&self,
) -> impl rayon::iter::ParallelIterator<Item = Result<Sqllog<'_>, ParseError>> + '_ {
use rayon::prelude::*;
const PAR_THRESHOLD: usize = 32 * 1024 * 1024;
let data: &[u8] = &self.mmap;
let encoding = self.encoding;
let bounds: Vec<(usize, usize)> = if data.is_empty() {
Vec::new()
} else if data.len() < PAR_THRESHOLD {
vec![(0, data.len())]
} else {
let num_threads = rayon::current_num_threads().max(1);
let chunk_size = (data.len() / num_threads).max(1);
let mut starts: Vec<usize> = vec![0];
for i in 1..num_threads {
let boundary = find_next_record_start(data, i * chunk_size);
if boundary < data.len() {
starts.push(boundary);
}
}
starts.push(data.len());
starts.dedup();
starts.windows(2).map(|w| (w[0], w[1])).collect()
};
bounds
.into_par_iter()
.flat_map_iter(move |(start, end)| LogIterator {
data: &data[start..end],
pos: 0,
encoding,
})
}
}
pub struct LogIterator<'a> {
data: &'a [u8],
pos: usize,
encoding: FileEncodingHint,
}
impl<'a> Iterator for LogIterator<'a> {
type Item = Result<Sqllog<'a>, ParseError>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.pos >= self.data.len() {
return None;
}
let data = &self.data[self.pos..];
let (record_end, next_start, is_multiline) = match memchr(b'\n', data) {
None => (data.len(), data.len(), false),
Some(first_nl) => {
let ts_start = first_nl + 1;
if ts_start + 23 <= data.len()
&& is_timestamp_start(&data[ts_start..ts_start + 23])
{
(first_nl, ts_start, false)
} else {
let mut found_boundary: Option<usize> = None;
for candidate in FINDER_RECORD_START.find_iter(&data[ts_start..]) {
let abs_ts = ts_start + candidate + 1;
if abs_ts + 23 <= data.len()
&& is_timestamp_start(&data[abs_ts..abs_ts + 23])
{
found_boundary = Some(ts_start + candidate);
break;
}
}
match found_boundary {
Some(idx) => (idx, idx + 1, true),
None => (data.len(), data.len(), true),
}
}
}
};
let record_slice = &data[..record_end];
self.pos += next_start;
let record_slice = if record_slice.ends_with(b"\r") {
&record_slice[..record_slice.len() - 1]
} else {
record_slice
};
if record_slice.is_empty() {
continue;
}
return Some(parse_record_with_hint(
record_slice,
is_multiline,
self.encoding,
));
}
}
}
fn find_next_record_start(data: &[u8], from: usize) -> usize {
let mut pos = from;
if let Some(nl) = memchr(b'\n', &data[pos..]) {
pos += nl + 1;
} else {
return data.len();
}
if pos + 23 <= data.len() && is_timestamp_start(&data[pos..pos + 23]) {
return pos;
}
for candidate in FINDER_RECORD_START.find_iter(&data[pos..]) {
let ts_start = pos + candidate + 1;
if ts_start + 23 <= data.len() && is_timestamp_start(&data[ts_start..ts_start + 23]) {
return ts_start;
}
}
data.len()
}
pub fn parse_record<'a>(record_bytes: &'a [u8]) -> Result<Sqllog<'a>, ParseError> {
let is_multiline = memchr(b'\n', record_bytes).is_some();
parse_record_with_hint(record_bytes, is_multiline, FileEncodingHint::Auto)
}
fn parse_record_with_hint<'a>(
record_bytes: &'a [u8],
is_multiline: bool,
encoding_hint: FileEncodingHint,
) -> Result<Sqllog<'a>, ParseError> {
let (first_line, _rest) = if is_multiline {
match memchr(b'\n', record_bytes) {
Some(idx) => {
let mut line = &record_bytes[..idx];
if line.ends_with(b"\r") {
line = &line[..line.len() - 1];
}
(line, &record_bytes[idx + 1..])
}
None => {
let mut line = record_bytes;
if line.ends_with(b"\r") {
line = &line[..line.len() - 1];
}
(line, &[] as &[u8])
}
}
} else {
let mut line = record_bytes;
if line.ends_with(b"\r") {
line = &line[..line.len() - 1];
}
(line, &[] as &[u8])
};
if first_line.len() < 23 {
return Err(make_invalid_format_error(first_line));
}
let ts = unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(&first_line[0..23])) };
let meta_start = match memchr(b'(', &first_line[23..]) {
Some(idx) => 23 + idx,
None => {
return Err(make_invalid_format_error(first_line));
}
};
let meta_end = match FINDER_CLOSE_META.find(&first_line[meta_start..]) {
Some(idx) => Some(meta_start + idx),
None => memrchr(b')', &first_line[meta_start..]).map(|idx| meta_start + idx),
};
let meta_end = match meta_end {
Some(idx) => idx,
None => {
return Err(make_invalid_format_error(first_line));
}
};
let meta_bytes = &first_line[meta_start + 1..meta_end];
let meta_raw = match encoding_hint {
FileEncodingHint::Utf8 => {
unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(meta_bytes)) }
}
FileEncodingHint::Gb18030 => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
Ok(s) => Cow::Owned(s),
Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
},
FileEncodingHint::Auto => match simd_from_utf8(meta_bytes) {
Ok(_) => {
unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(meta_bytes)) }
}
Err(_) => match GB18030.decode(meta_bytes, DecoderTrap::Strict) {
Ok(s) => Cow::Owned(s),
Err(_) => Cow::Owned(String::from_utf8_lossy(meta_bytes).into_owned()),
},
},
};
let body_start_in_first_line = meta_end + 1;
let content_start = if body_start_in_first_line < first_line.len()
&& first_line[body_start_in_first_line] == b' '
{
body_start_in_first_line + 1
} else {
body_start_in_first_line
};
let mut tag: Option<Cow<'a, str>> = None;
let content_slice = if content_start < record_bytes.len() {
let mut s = &record_bytes[content_start..];
if !s.is_empty()
&& s[0] == b'['
&& let Some(end_idx) = memchr(b']', s)
&& end_idx >= 1
{
let inner = &s[1..end_idx];
if !inner.contains(&b' ') && inner.len() <= 32 {
tag = match encoding_hint {
FileEncodingHint::Utf8 => {
Some(unsafe { Cow::Borrowed(std::str::from_utf8_unchecked(inner)) })
}
_ => match simd_from_utf8(inner) {
Ok(_) => Some(unsafe {
Cow::Borrowed(std::str::from_utf8_unchecked(inner))
}),
Err(_) => match encoding_hint {
FileEncodingHint::Gb18030 => {
match GB18030.decode(inner, DecoderTrap::Strict) {
Ok(s) => Some(Cow::Owned(s)),
Err(_) => Some(Cow::Owned(
String::from_utf8_lossy(inner).into_owned(),
)),
}
}
_ => Some(Cow::Owned(String::from_utf8_lossy(inner).into_owned())),
},
},
};
s = &s[end_idx + 1..];
let mut skip = 0usize;
while skip < s.len() && s[skip].is_ascii_whitespace() {
skip += 1;
}
s = &s[skip..];
}
}
s
} else {
&[] as &[u8]
};
let content_raw = Cow::Borrowed(content_slice);
Ok(Sqllog {
ts,
meta_raw,
content_raw,
tag,
encoding: encoding_hint,
})
}
const LO_MASK: u64 = 0xFF0000FF0000FFFF; const LO_EXPECTED: u64 = 0x2D00002D00003032; const HI_MASK: u64 = 0x0000FF0000FF0000; const HI_EXPECTED: u64 = 0x00003A0000200000;
#[inline(always)]
fn is_timestamp_start(bytes: &[u8]) -> bool {
debug_assert!(bytes.len() >= 23);
let lo = u64::from_le_bytes(bytes[0..8].try_into().unwrap());
let hi = u64::from_le_bytes(bytes[8..16].try_into().unwrap());
(lo & LO_MASK == LO_EXPECTED)
&& (hi & HI_MASK == HI_EXPECTED)
&& bytes[16] == b':'
&& bytes[19] == b'.'
}
#[cold]
fn make_invalid_format_error(raw_bytes: &[u8]) -> ParseError {
ParseError::InvalidFormat {
raw: String::from_utf8_lossy(raw_bytes).to_string(),
}
}