#![allow(dead_code)]
use super::JsonValue;
use anyhow::{Context, Error, Result};
use futures::{Stream, StreamExt, future::ready, stream};
use std::io::BufRead;
fn process_line(line: std::io::Result<String>) -> Option<Result<JsonValue>> {
match line {
Ok(line) if line.trim().is_empty() => None, Ok(line) => Some(JsonValue::parse_str(&line)),
Err(e) => Some(Err(e.into())),
}
}
pub fn read_ndjson_iter(reader: impl BufRead) -> impl Iterator<Item = Result<JsonValue>> {
reader
.lines()
.enumerate()
.filter_map(|(index, line)| process_line(line).map(|r| r.with_context(|| format!("error in line {}", index + 1))))
}
pub fn read_ndjson_stream(reader: impl BufRead) -> impl Stream<Item = Result<JsonValue>> {
stream::iter(reader.lines().enumerate())
.map(|(index, line)| {
tokio::spawn(
async move { process_line(line).map(|r| r.with_context(|| format!("error in line {}", index + 1))) },
)
})
.buffered(num_cpus::get())
.filter_map(|f| {
ready(match f {
Ok(value) => value,
Err(e) => Some(Err(Error::from(e))),
})
})
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use std::io::Cursor;
use tokio;
fn join_errors(e: &Error) -> Vec<String> {
e.chain().map(std::string::ToString::to_string).collect::<Vec<String>>()
}
fn json_from_str<T: AsRef<str>>(s: T) -> Result<JsonValue> {
JsonValue::parse_str(s.as_ref())
}
#[test]
fn test_single_line() -> Result<()> {
let data = r#"{"key": "value"}"#;
let reader = Cursor::new(data);
let mut iter = read_ndjson_iter(reader);
assert_eq!(iter.next().unwrap()?, json_from_str(data)?);
assert!(iter.next().is_none());
Ok(())
}
#[test]
fn test_multiple_lines() -> Result<()> {
let data = r#"
{"key1": "value1"}
{"key2": "value2"}
{"key3": "value3"}
"#;
let reader = Cursor::new(data.trim());
let mut iter = read_ndjson_iter(reader);
assert_eq!(iter.next().unwrap()?, json_from_str(r#"{"key1": "value1"}"#)?);
assert_eq!(iter.next().unwrap()?, json_from_str(r#"{"key2": "value2"}"#)?);
assert_eq!(iter.next().unwrap()?, json_from_str(r#"{"key3": "value3"}"#)?);
assert!(iter.next().is_none());
Ok(())
}
#[test]
fn test_empty_lines() -> Result<()> {
let data = r#"
{"key1": "value1"}
{"key2": "value2"}
{"key3": "value3"}
"#;
let reader = Cursor::new(data.trim());
let mut iter = read_ndjson_iter(reader);
assert_eq!(iter.next().unwrap()?, json_from_str(r#"{"key1": "value1"}"#)?);
assert_eq!(iter.next().unwrap()?, json_from_str(r#"{"key2": "value2"}"#)?);
assert_eq!(iter.next().unwrap()?, json_from_str(r#"{"key3": "value3"}"#)?);
assert!(iter.next().is_none());
Ok(())
}
#[test]
fn test_invalid_json() -> Result<()> {
let data = "{\"key1\": \"value1\"}\n{invalid json}\n{\"key2\": \"value2\"}\n";
let reader = Cursor::new(data.trim());
let vec = read_ndjson_iter(reader).collect::<Vec<_>>();
assert_eq!(vec.len(), 3);
assert_eq!(vec[0].as_ref().unwrap(), &json_from_str(r#"{"key1": "value1"}"#)?);
assert_eq!(
join_errors(vec[1].as_ref().unwrap_err()),
vec![
"error in line 2",
"while parsing JSON string '{invalid json}'",
"while parsing JSON data",
"while parsing JSON object",
"while parsing object entries",
"parsing object, expected '\"' or '}' at position 1: {"
]
);
assert_eq!(vec[2].as_ref().unwrap(), &json_from_str(r#"{"key2": "value2"}"#)?);
Ok(())
}
#[test]
fn test_mixed_valid_invalid_lines() -> Result<()> {
let data = "{\"key1\": \"value1\"}\nnot a json\n{\"key2\": \"value2\"}\n";
let reader = Cursor::new(data.trim());
let vec = read_ndjson_iter(reader).collect::<Vec<_>>();
assert_eq!(vec.len(), 3);
assert_eq!(vec[0].as_ref().unwrap(), &json_from_str(r#"{"key1": "value1"}"#)?);
assert_eq!(
join_errors(vec[1].as_ref().unwrap_err()),
vec![
"error in line 2",
"while parsing JSON string 'not a json'",
"while parsing JSON data",
"while parsing tag 'null'",
"unexpected character while parsing tag 'null' at position 2: no"
]
);
assert_eq!(vec[2].as_ref().unwrap(), &json_from_str(r#"{"key2": "value2"}"#)?);
Ok(())
}
#[test]
fn test_empty_input() {
let data = "";
let reader = Cursor::new(data);
let mut iter = read_ndjson_iter(reader);
assert!(iter.next().is_none());
}
#[tokio::test]
async fn test_read_stream_single_line() -> Result<()> {
let data = r#"{"key": "value"}"#;
let reader = Cursor::new(data);
let stream = read_ndjson_stream(reader);
let results: Vec<_> = stream.collect().await;
assert_eq!(results.len(), 1);
assert_eq!(results[0].as_ref().unwrap(), &json_from_str(data)?);
Ok(())
}
#[tokio::test]
async fn test_read_stream_mixed_valid_invalid() -> Result<()> {
let data = "{\"key1\": \"value1\"}\nnot json\n{\"key2\": \"value2\"}\n";
let reader = Cursor::new(data);
let results: Vec<_> = read_ndjson_stream(reader).collect().await;
assert_eq!(results.len(), 3);
assert_eq!(results[0].as_ref().unwrap(), &json_from_str(r#"{"key1": "value1"}"#)?);
let err = results[1].as_ref().unwrap_err();
let msg = join_errors(err);
assert_eq!(
msg,
vec![
"error in line 2",
"while parsing JSON string 'not json'",
"while parsing JSON data",
"while parsing tag 'null'",
"unexpected character while parsing tag 'null' at position 2: no"
]
);
assert_eq!(results[2].as_ref().unwrap(), &json_from_str(r#"{"key2": "value2"}"#)?);
Ok(())
}
}