asyncjsonstream 0.1.1

Async JSON stream reader for selective parsing of large payloads
Documentation
use asyncjsonstream::{AsyncJsonStreamReader, JsonToken};
use serde_json::Value;
use std::env;
use std::fs;
use std::path::PathBuf;
use std::time::Instant;
use tokio::io::BufReader;

fn usage() {
    eprintln!(
        "Usage: bench_big_object --path <file> [--mode serde|async|async-light|both] [--repeat <n>]"
    );
}

fn parse_u64(value: &str, name: &str) -> u64 {
    value
        .parse()
        .unwrap_or_else(|_| panic!("Invalid {name}: {value}"))
}

fn parse_mode(value: &str) -> Mode {
    match value {
        "serde" => Mode::Serde,
        "async" => Mode::Async,
        "async-light" => Mode::AsyncLight,
        "both" => Mode::Both,
        _ => panic!("Invalid mode: {value}"),
    }
}

#[derive(Clone, Copy)]
enum Mode {
    Serde,
    Async,
    AsyncLight,
    Both,
}

fn bench_serde(path: &PathBuf) -> Result<(u64, u64, u128), Box<dyn std::error::Error>> {
    let start = Instant::now();
    let data = fs::read(path)?;
    let parsed: Value = serde_json::from_slice(&data)?;
    let rows = parsed
        .get("rows")
        .and_then(|v| v.as_array())
        .ok_or_else(|| "missing rows array".to_string())?;

    let mut checksum: u64 = 0;
    for row in rows {
        if let Some(id) = row.get("id").and_then(|v| v.as_u64()) {
            checksum = checksum.wrapping_add(id);
        }
    }

    Ok((rows.len() as u64, checksum, start.elapsed().as_millis()))
}

async fn bench_async(path: &PathBuf) -> Result<(u64, u64, u128), Box<dyn std::error::Error>> {
    let start = Instant::now();
    let file = tokio::fs::File::open(path).await?;
    let reader = BufReader::new(file);
    let mut reader = AsyncJsonStreamReader::new(reader);

    let mut rows: u64 = 0;
    let mut checksum: u64 = 0;

    while let Some(key) = reader.next_object_entry().await? {
        if key == "rows" {
            while reader.start_array_item().await? {
                let obj = reader.deserialize_object().await?;
                if let Some(id) = obj.get("id").and_then(|v| v.as_u64()) {
                    checksum = checksum.wrapping_add(id);
                }
                rows += 1;
            }
        }
    }

    Ok((rows, checksum, start.elapsed().as_millis()))
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Container {
    Object,
    Array,
}

async fn consume_value<R>(
    reader: &mut AsyncJsonStreamReader<R>,
) -> Result<(), Box<dyn std::error::Error>>
where
    R: tokio::io::AsyncRead + Unpin,
{
    let token = reader
        .next_token()
        .await?
        .ok_or_else(|| "unexpected EOF while consuming value".to_string())?;
    consume_value_from_token(reader, token).await
}

async fn consume_value_from_token<R>(
    reader: &mut AsyncJsonStreamReader<R>,
    mut token: JsonToken,
) -> Result<(), Box<dyn std::error::Error>>
where
    R: tokio::io::AsyncRead + Unpin,
{
    let mut stack: Vec<Container> = Vec::new();

    loop {
        match token {
            JsonToken::StartObject => stack.push(Container::Object),
            JsonToken::StartArray => stack.push(Container::Array),
            JsonToken::EndObject => match stack.pop() {
                Some(Container::Object) => {}
                _ => return Err("unexpected EndObject".into()),
            },
            JsonToken::EndArray => match stack.pop() {
                Some(Container::Array) => {}
                _ => return Err("unexpected EndArray".into()),
            },
            JsonToken::EndObjectOrListItem => {}
            JsonToken::Key(_) => {}
            JsonToken::String(_)
            | JsonToken::Number(_)
            | JsonToken::Boolean(_)
            | JsonToken::Null => {}
        }

        if stack.is_empty() {
            match token {
                JsonToken::String(_)
                | JsonToken::Number(_)
                | JsonToken::Boolean(_)
                | JsonToken::Null
                | JsonToken::EndObject
                | JsonToken::EndArray => break,
                _ => {}
            }
        }

        token = reader
            .next_token()
            .await?
            .ok_or_else(|| "unexpected EOF while consuming value".to_string())?;
    }

    Ok(())
}

async fn bench_async_light(path: &PathBuf) -> Result<(u64, u64, u128), Box<dyn std::error::Error>> {
    let start = Instant::now();
    let file = tokio::fs::File::open(path).await?;
    let reader = BufReader::new(file);
    let mut reader = AsyncJsonStreamReader::new(reader);

    let mut rows: u64 = 0;
    let mut checksum: u64 = 0;

    let token = reader
        .next_token()
        .await?
        .ok_or_else(|| "empty input".to_string())?;
    if token != JsonToken::StartObject {
        return Err("expected root object".into());
    }

    loop {
        let token = reader
            .next_token()
            .await?
            .ok_or_else(|| "unexpected EOF in root object".to_string())?;
        match token {
            JsonToken::Key(key) => {
                if key == "rows" {
                    let next = reader
                        .next_token()
                        .await?
                        .ok_or_else(|| "unexpected EOF before rows array".to_string())?;
                    if next != JsonToken::StartArray {
                        return Err("expected rows array".into());
                    }

                    loop {
                        let token = reader
                            .next_token()
                            .await?
                            .ok_or_else(|| "unexpected EOF in rows array".to_string())?;
                        match token {
                            JsonToken::EndArray => break,
                            JsonToken::EndObjectOrListItem => continue,
                            JsonToken::StartObject => loop {
                                let token = reader.next_token().await?.ok_or_else(|| {
                                    "unexpected EOF while reading row".to_string()
                                })?;
                                match token {
                                    JsonToken::EndObject => {
                                        rows += 1;
                                        break;
                                    }
                                    JsonToken::EndObjectOrListItem => continue,
                                    JsonToken::Key(field) => {
                                        if field == "id" {
                                            let token =
                                                reader.next_token().await?.ok_or_else(|| {
                                                    "unexpected EOF reading id".to_string()
                                                })?;
                                            match token {
                                                JsonToken::Number(n) => {
                                                    if let Ok(id) = n.parse::<u64>() {
                                                        checksum = checksum.wrapping_add(id);
                                                    }
                                                }
                                                JsonToken::String(s) => {
                                                    if let Ok(id) = s.parse::<u64>() {
                                                        checksum = checksum.wrapping_add(id);
                                                    }
                                                }
                                                JsonToken::Null => {}
                                                other => {
                                                    return Err(format!(
                                                        "unexpected id token: {other:?}"
                                                    )
                                                    .into());
                                                }
                                            }
                                        } else {
                                            consume_value(&mut reader).await?;
                                        }
                                    }
                                    other => {
                                        return Err(
                                            format!("unexpected token in row: {other:?}").into()
                                        );
                                    }
                                }
                            },
                            other => consume_value_from_token(&mut reader, other).await?,
                        }
                    }
                } else {
                    consume_value(&mut reader).await?;
                }
            }
            JsonToken::EndObject => break,
            JsonToken::EndObjectOrListItem => continue,
            other => {
                return Err(format!("unexpected token in root: {other:?}").into());
            }
        }
    }

    Ok((rows, checksum, start.elapsed().as_millis()))
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut path: Option<PathBuf> = None;
    let mut mode = Mode::Both;
    let mut repeat: u64 = 1;

    let args: Vec<String> = env::args().skip(1).collect();
    let mut i = 0;
    while i < args.len() {
        match args[i].as_str() {
            "--path" => {
                i += 1;
                if i >= args.len() {
                    usage();
                    panic!("Missing value for --path");
                }
                path = Some(PathBuf::from(&args[i]));
            }
            "--mode" => {
                i += 1;
                if i >= args.len() {
                    usage();
                    panic!("Missing value for --mode");
                }
                mode = parse_mode(&args[i]);
            }
            "--repeat" => {
                i += 1;
                if i >= args.len() {
                    usage();
                    panic!("Missing value for --repeat");
                }
                repeat = parse_u64(&args[i], "repeat");
            }
            "--help" | "-h" => {
                usage();
                return Ok(());
            }
            other => {
                usage();
                panic!("Unknown argument: {other}");
            }
        }
        i += 1;
    }

    let path = path.unwrap_or_else(|| {
        usage();
        panic!("--path is required");
    });

    for run in 1..=repeat {
        if matches!(mode, Mode::Serde | Mode::Both) {
            let (rows, checksum, ms) = bench_serde(&path)?;
            println!("run={run} mode=serde rows={rows} checksum={checksum} elapsed_ms={ms}");
        }
        if matches!(mode, Mode::Async | Mode::Both) {
            let (rows, checksum, ms) = bench_async(&path).await?;
            println!("run={run} mode=async rows={rows} checksum={checksum} elapsed_ms={ms}");
        }
        if matches!(mode, Mode::AsyncLight) {
            let (rows, checksum, ms) = bench_async_light(&path).await?;
            println!("run={run} mode=async-light rows={rows} checksum={checksum} elapsed_ms={ms}");
        }
    }

    Ok(())
}