use asmjson::{Sax, sax_parser};
use memmap2::Mmap;
use rayon::prelude::*;
use std::{
fs,
io::{BufWriter, Write},
path::Path,
time::Instant,
};
#[derive(Default, Debug)]
struct StringCounter {
strings: usize,
keys: usize,
}
impl<'src> Sax<'src> for StringCounter {
type Output = Self;
fn null(&mut self) {}
fn bool_val(&mut self, _v: bool) {}
fn number(&mut self, _s: &'src str) {}
fn string(&mut self, _s: &'src str) {
self.strings += 1;
}
fn escaped_string(&mut self, _s: &str) {
self.strings += 1;
}
fn key(&mut self, _s: &'src str) {
self.keys += 1;
}
fn escaped_key(&mut self, _s: &str) {
self.keys += 1;
}
fn start_object(&mut self) {}
fn end_object(&mut self) {}
fn start_array(&mut self) {}
fn end_array(&mut self) {}
fn finish(self) -> Option<Self::Output> {
Some(self)
}
}
const CHUNK_SIZE: usize = 1 << 20;
fn parse_chunk(chunk: &str, parser: asmjson::SaxParser) -> StringCounter {
let mut out = StringCounter::default();
for line in chunk.lines() {
let line = line.trim();
if !line.is_empty() {
if let Some(c) = parser.parse(line, StringCounter::default()) {
out.strings += c.strings;
out.keys += c.keys;
}
}
}
out
}
fn split_at_newlines(data: &[u8], chunk_size: usize) -> Vec<&[u8]> {
let mut chunks = Vec::new();
let mut start = 0;
while start < data.len() {
let nominal_end = (start + chunk_size).min(data.len());
let end = if nominal_end == data.len() {
nominal_end
} else {
data[nominal_end..]
.iter()
.position(|&b| b == b'\n')
.map(|i| nominal_end + i + 1)
.unwrap_or(data.len())
};
chunks.push(&data[start..end]);
start = end;
}
chunks
}
const LINE_BYTES: u64 = 100;
const TARGET_BYTES: u64 = 1 << 30; const FILE_PATH: &str = "/tmp/file.jsonl";
fn create_test_file(path: &Path) {
let n_lines = TARGET_BYTES / LINE_BYTES;
println!(
"generating {path} ({n_lines} lines, {} MiB) …",
TARGET_BYTES >> 20,
path = path.display(),
);
let t = Instant::now();
let file = fs::File::create(path).expect("cannot create file");
let mut w = BufWriter::with_capacity(4 << 20, file);
for i in 0..n_lines {
writeln!(
w,
"{{\"identifier\":\"user{i:012}\",\"description\":\"item{i:012}\",\"subcategory\":\"type{i:012}\"}}"
)
.expect("write failed");
}
w.flush().expect("flush failed");
println!(" done in {:.2?}", t.elapsed());
}
fn main() {
let path = Path::new(FILE_PATH);
create_test_file(path);
let file = fs::File::open(path).expect("cannot open file");
let mmap = unsafe { Mmap::map(&file) }.expect("mmap failed");
let chunks = split_at_newlines(&mmap, CHUNK_SIZE);
println!(
"parsing {} MiB → {} chunk(s) of ~{} KiB …",
mmap.len() >> 20,
chunks.len(),
CHUNK_SIZE / 1024,
);
let bytes = mmap.len();
let t = Instant::now();
let parser = sax_parser(); let totals: StringCounter = chunks
.par_iter()
.map(|chunk| {
let s = std::str::from_utf8(chunk).expect("non-UTF-8 data in chunk");
parse_chunk(s, parser)
})
.reduce(StringCounter::default, |mut a, b| {
a.strings += b.strings;
a.keys += b.keys;
a
});
let elapsed = t.elapsed();
let gib_per_sec = bytes as f64 / elapsed.as_secs_f64() / (1u64 << 30) as f64;
println!(" done in {:.2?} ({:.2} GiB/s)", elapsed, gib_per_sec);
println!("keys found : {}", totals.keys);
println!("strings found: {}", totals.strings);
}