#![forbid(unsafe_code)]
use std::str::FromStr;
use bytes::BytesMut;
use serde_json::{Map, Value};
use thiserror::Error;
use tokio::io::{AsyncRead, AsyncReadExt};
const INTERNAL_BUFFER_SIZE: usize = 8 * 1024;
#[derive(Error, Debug)]
pub enum AsyncJsonStreamReaderError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
#[error("JSON error: {error}, position: {position}")]
JsonError { error: String, position: usize },
#[error("Unexpected token: expected {expected}, found {found}")]
UnexpectedToken {
expected: &'static str,
found: String,
},
#[error("Internal state error: {message}")]
InternalState { message: String },
}
#[derive(Debug, Eq, PartialEq, Clone, Copy)]
enum ReaderState {
Start, Object, Value, }
#[derive(Debug, Eq, PartialEq, Clone)]
pub enum JsonToken {
StartObject,
EndObjectOrListItem,
EndObject,
StartArray,
EndArray,
Key(String),
String(String),
Number(String),
Boolean(bool),
Null,
}
pub struct AsyncJsonStreamReader<R> {
reader: R,
buffer: BytesMut,
position: usize,
depth: usize,
state: ReaderState,
preserve_buffer: bool,
}
impl<R: AsyncRead + Unpin> AsyncJsonStreamReader<R> {
pub fn new(reader: R) -> Self {
Self {
reader,
buffer: BytesMut::with_capacity(INTERNAL_BUFFER_SIZE),
position: 0,
depth: 0,
state: ReaderState::Start,
preserve_buffer: false,
}
}
pub async fn peek_token(&mut self) -> Result<Option<JsonToken>, AsyncJsonStreamReaderError> {
let saved_position = self.position;
let saved_depth = self.depth;
let saved_state = self.state;
let saved_preserve_buffer = self.preserve_buffer;
self.preserve_buffer = true;
let result = self.next_token().await;
self.position = saved_position;
self.depth = saved_depth;
self.state = saved_state;
self.preserve_buffer = saved_preserve_buffer;
result
}
pub async fn next_token(&mut self) -> Result<Option<JsonToken>, AsyncJsonStreamReaderError> {
loop {
self.skip_whitespace();
if self.position >= self.buffer.len() {
if !self.fill_buffer().await? {
return Ok(None); }
continue;
}
let ch = self.buffer[self.position];
return Ok(Some(match ch {
b'{' => {
self.position += 1;
self.depth += 1;
JsonToken::StartObject
}
b'}' => {
self.position += 1;
self.depth -= 1;
JsonToken::EndObject
}
b'[' => {
self.position += 1;
self.depth += 1;
JsonToken::StartArray
}
b']' => {
self.position += 1;
self.depth -= 1;
JsonToken::EndArray
}
b'"' => {
let s = self.parse_string().await?;
self.skip_whitespace();
self.fill_buffer_if_needed(true).await?;
if self.position < self.buffer.len() && self.buffer[self.position] == b':' {
self.position += 1; JsonToken::Key(s)
} else {
JsonToken::String(s)
}
}
b',' => {
self.position += 1;
JsonToken::EndObjectOrListItem
}
b'n' => {
self.parse_literal("null").await?;
JsonToken::Null
}
b't' => {
self.parse_literal("true").await?;
JsonToken::Boolean(true)
}
b'f' => {
self.parse_literal("false").await?;
JsonToken::Boolean(false)
}
b'-' | b'0'..=b'9' => {
let num = self.parse_number().await?;
let is_invalid_leading_zero = (num.starts_with('0')
&& num.len() > 1
&& !matches!(num.chars().nth(1), Some('.' | 'e' | 'E')))
|| (num.starts_with("-0")
&& num.len() > 2
&& !matches!(num.chars().nth(2), Some('.' | 'e' | 'E')));
if is_invalid_leading_zero {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "Invalid number: leading zeros are not allowed".to_string(),
position: self.position,
});
}
JsonToken::Number(num)
}
x => {
return Err(AsyncJsonStreamReaderError::JsonError {
error: format!("Unexpected JSON character: {}", x as char),
position: self.position,
});
}
}));
}
}
async fn fill_buffer(&mut self) -> Result<bool, AsyncJsonStreamReaderError> {
if self.preserve_buffer {
self.buffer.reserve(INTERNAL_BUFFER_SIZE);
} else {
if self.position != self.buffer.len() {
return Err(AsyncJsonStreamReaderError::InternalState {
message: format!(
"fill_buffer called with an unconsumed buffer: position={}, buffer.len={}",
self.position,
self.buffer.len()
),
});
}
if !self.buffer.is_empty() {
if self.buffer.capacity() > INTERNAL_BUFFER_SIZE {
self.buffer = BytesMut::with_capacity(INTERNAL_BUFFER_SIZE);
} else {
self.buffer.clear();
}
self.position = 0;
}
}
Ok(self.reader.read_buf(&mut self.buffer).await? > 0)
}
async fn fill_buffer_if_needed(
&mut self,
allow_eof: bool,
) -> Result<(), AsyncJsonStreamReaderError> {
if self.position >= self.buffer.len() {
let read_bytes = self.fill_buffer().await?;
if !read_bytes && !allow_eof {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "Unexpected EOF while requesting to fill buffer".to_string(),
position: self.position,
});
}
}
Ok(())
}
fn skip_whitespace(&mut self) {
while self.position < self.buffer.len() {
match self.buffer[self.position] {
b' ' | b'\n' | b'\r' | b'\t' => self.position += 1,
_ => break,
}
}
}
async fn parse_string(&mut self) -> Result<String, AsyncJsonStreamReaderError> {
if self.position >= self.buffer.len() || self.buffer[self.position] != b'"' {
return Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "\"",
found: if self.position < self.buffer.len() {
format!("{}", self.buffer[self.position] as char)
} else {
"EOF".to_string()
},
});
}
self.position += 1;
let mut result_buf: Option<Vec<u8>> = None;
let mut string_start_in_buffer = self.position;
loop {
let searchable_slice = &self.buffer[self.position..];
let mut special_char_offset = None;
for (i, &byte) in searchable_slice.iter().enumerate() {
if byte == b'\\' || byte == b'"' {
special_char_offset = Some(i);
break;
}
}
if let Some(offset) = special_char_offset {
let special_byte = searchable_slice[offset];
let current_slice = &searchable_slice[..offset];
if special_byte == b'"' {
self.position += offset + 1; return if let Some(mut existing_buf) = result_buf {
existing_buf.extend_from_slice(current_slice);
String::from_utf8(existing_buf).map_err(|e| {
AsyncJsonStreamReaderError::JsonError {
error: format!("Invalid UTF-8 in string: {}", e),
position: self.position,
}
})
} else {
String::from_utf8(current_slice.to_vec()).map_err(|e| {
AsyncJsonStreamReaderError::JsonError {
error: format!("Invalid UTF-8 in string: {}", e),
position: self.position,
}
})
};
}
let buf = if let Some(b) = result_buf.as_mut() {
b.extend_from_slice(current_slice);
b
} else {
let leading_slice =
&self.buffer[string_start_in_buffer..self.position + offset];
result_buf.insert(leading_slice.to_vec())
};
self.position += offset + 1; self.fill_buffer_if_needed(false).await?;
let escaped_char_selector = self.buffer[self.position];
self.position += 1;
match escaped_char_selector {
b'"' => buf.push(b'"'),
b'\\' => buf.push(b'\\'),
b'/' => buf.push(b'/'),
b'b' => buf.push(8), b'f' => buf.push(12), b'n' => buf.push(10), b'r' => buf.push(13), b't' => buf.push(9), b'u' => {
let unicode_bytes = self.parse_unicode_escape().await?;
buf.extend_from_slice(&unicode_bytes);
}
_ => {
return Err(AsyncJsonStreamReaderError::JsonError {
error: format!(
"Invalid JSON escape sequence: \\{}",
escaped_char_selector as char
),
position: self.position,
});
}
}
string_start_in_buffer = self.position; continue;
}
let remaining_slice = &self.buffer[string_start_in_buffer..];
result_buf
.get_or_insert_with(|| Vec::with_capacity(remaining_slice.len() * 2))
.extend_from_slice(remaining_slice);
self.position = self.buffer.len();
if !self.fill_buffer().await? {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "Unexpected EOF while parsing string".to_string(),
position: self.position,
});
}
string_start_in_buffer = self.position;
}
}
async fn parse_unicode_escape(&mut self) -> Result<Vec<u8>, AsyncJsonStreamReaderError> {
let codepoint = self.read_four_hex_digits().await?;
let final_codepoint = if (0xD800..=0xDBFF).contains(&codepoint) {
self.fill_buffer_if_needed(false).await?;
if self.buffer[self.position] != b'\\' {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "Unmatched high surrogate in Unicode escape sequence".to_string(),
position: self.position,
});
}
self.position += 1;
self.fill_buffer_if_needed(false).await?;
if self.buffer[self.position] != b'u' {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "Unmatched high surrogate in Unicode escape sequence".to_string(),
position: self.position,
});
}
self.position += 1;
let low_surrogate = self.read_four_hex_digits().await?;
if !(0xDC00..=0xDFFF).contains(&low_surrogate) {
return Err(AsyncJsonStreamReaderError::JsonError {
error: format!(
"High surrogate not followed by low surrogate. Got {:x}",
low_surrogate
),
position: self.position - 6,
});
}
let high = codepoint - 0xD800;
let low = low_surrogate - 0xDC00;
0x10000 + (high << 10) + low
} else if (0xDC00..=0xDFFF).contains(&codepoint) {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "Unmatched low surrogate in Unicode escape sequence".to_string(),
position: self.position - 4,
});
} else {
codepoint
};
let c = std::char::from_u32(final_codepoint).ok_or_else(|| {
AsyncJsonStreamReaderError::JsonError {
error: format!("Invalid unicode codepoint: {:x}", final_codepoint),
position: self.position,
}
})?;
let mut char_bytes_buf = [0u8; 4];
let encoded_bytes_slice = c.encode_utf8(&mut char_bytes_buf);
Ok(encoded_bytes_slice.as_bytes().to_vec())
}
async fn read_four_hex_digits(&mut self) -> Result<u32, AsyncJsonStreamReaderError> {
let mut hex_chars = [0u8; 4];
if self.position + 4 <= self.buffer.len() {
hex_chars.copy_from_slice(&self.buffer[self.position..self.position + 4]);
self.position += 4;
} else {
for hex_char in &mut hex_chars {
self.fill_buffer_if_needed(false).await?;
*hex_char = self.buffer[self.position];
self.position += 1;
}
}
let hex_str =
std::str::from_utf8(&hex_chars).map_err(|_| AsyncJsonStreamReaderError::JsonError {
error: "Invalid UTF-8 in unicode escape sequence".to_string(),
position: self.position - 4,
})?;
u32::from_str_radix(hex_str, 16).map_err(|_| AsyncJsonStreamReaderError::JsonError {
error: format!("Invalid hex value in unicode escape sequence: {}", hex_str),
position: self.position - 4,
})
}
async fn parse_number(&mut self) -> Result<String, AsyncJsonStreamReaderError> {
let mut result = String::new();
self.fill_buffer_if_needed(false).await?;
if self.position < self.buffer.len() && self.buffer[self.position] == b'-' {
result.push(self.buffer[self.position] as char);
self.position += 1;
}
let mut has_digits = false;
loop {
self.fill_buffer_if_needed(true).await?;
if self.position >= self.buffer.len() {
break;
}
let ch = self.buffer[self.position];
if ch.is_ascii_digit() {
result.push(ch as char);
self.position += 1;
has_digits = true;
} else {
break;
}
}
if !has_digits {
return Err(AsyncJsonStreamReaderError::JsonError {
error: format!("InvalidNumber: {result}"),
position: self.position,
});
}
self.fill_buffer_if_needed(true).await?;
if self.position < self.buffer.len() && self.buffer[self.position] == b'.' {
result.push(self.buffer[self.position] as char);
self.position += 1;
self.fill_buffer_if_needed(true).await?;
if self.position >= self.buffer.len() {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "InvalidNumber: unexpected EOF after decimal point".to_string(),
position: self.position,
});
}
if !self.buffer[self.position].is_ascii_digit() {
return Err(AsyncJsonStreamReaderError::JsonError {
error: format!(
"Invalid character after decimal point: {}",
self.buffer[self.position] as char
),
position: self.position,
});
}
loop {
self.fill_buffer_if_needed(true).await?;
if self.position >= self.buffer.len() {
break;
}
let ch = self.buffer[self.position];
if !ch.is_ascii_digit() {
break;
}
result.push(ch as char);
self.position += 1;
}
}
self.fill_buffer_if_needed(true).await?;
if self.position < self.buffer.len()
&& (self.buffer[self.position] == b'e' || self.buffer[self.position] == b'E')
{
result.push(self.buffer[self.position] as char);
self.position += 1;
self.fill_buffer_if_needed(true).await?;
if self.position < self.buffer.len()
&& (self.buffer[self.position] == b'+' || self.buffer[self.position] == b'-')
{
result.push(self.buffer[self.position] as char);
self.position += 1;
}
self.fill_buffer_if_needed(true).await?;
if self.position >= self.buffer.len() {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "InvalidNumber: unexpected EOF after exponent".to_string(),
position: self.position,
});
}
if !self.buffer[self.position].is_ascii_digit() {
return Err(AsyncJsonStreamReaderError::JsonError {
error: format!(
"Invalid character after exponent: {}",
self.buffer[self.position] as char
),
position: self.position,
});
}
loop {
self.fill_buffer_if_needed(true).await?;
if self.position >= self.buffer.len() {
break;
}
let ch = self.buffer[self.position];
if !ch.is_ascii_digit() {
break;
}
result.push(ch as char);
self.position += 1;
}
}
if result.is_empty() {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "InvalidNumber: empty number".to_string(),
position: self.position,
});
}
Ok(result)
}
async fn parse_literal(
&mut self,
expected: &'static str,
) -> Result<(), AsyncJsonStreamReaderError> {
let expected_bytes = expected.as_bytes();
for &expected_byte in expected_bytes.iter() {
self.fill_buffer_if_needed(false).await?;
let actual_byte = self.buffer[self.position];
if actual_byte != expected_byte {
return Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected,
found: format!("{}", actual_byte as char),
});
}
self.position += 1;
}
Ok(())
}
pub async fn next_object_entry(
&mut self,
) -> Result<Option<String>, AsyncJsonStreamReaderError> {
if self.state == ReaderState::Start {
self.start_object().await?;
self.state = ReaderState::Object;
}
if self.state == ReaderState::Value {
self.skip_value().await?;
self.state = ReaderState::Object;
}
match self.read_key().await? {
Some(key) => {
self.state = ReaderState::Value;
Ok(Some(key))
}
None => {
Ok(None)
}
}
}
pub async fn skip_value(&mut self) -> Result<(), AsyncJsonStreamReaderError> {
let value_start_depth = self.depth;
let token =
self.next_token()
.await?
.ok_or_else(|| AsyncJsonStreamReaderError::JsonError {
error: "Unexpected EOF while skipping value".to_string(),
position: self.position,
})?;
match token {
JsonToken::StartObject | JsonToken::StartArray => {
while self.depth > value_start_depth {
if self.next_token().await?.is_none() {
return Err(AsyncJsonStreamReaderError::JsonError {
error: "Unexpected EOF while skipping value".to_string(),
position: self.position,
});
}
}
}
JsonToken::String(_)
| JsonToken::Number(_)
| JsonToken::Boolean(_)
| JsonToken::Null => {
}
_ => {
return Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "a value",
found: format!("{token:?}"),
});
}
}
self.state = ReaderState::Object;
Ok(())
}
pub async fn skip_to_key(
&mut self,
target_key: &str,
) -> Result<(), AsyncJsonStreamReaderError> {
if self.state == ReaderState::Start {
self.start_object().await?;
self.state = ReaderState::Object;
}
if self.state == ReaderState::Value {
self.skip_value().await?;
self.state = ReaderState::Object;
}
let object_depth = self.depth;
while let Some(token) = self.next_token().await? {
match token {
JsonToken::Key(key) if self.depth == object_depth && key == target_key => {
self.state = ReaderState::Value;
return Ok(());
}
JsonToken::EndObject if self.depth < object_depth => break,
_ => continue,
}
}
Err(AsyncJsonStreamReaderError::JsonError {
error: "key not found".to_string(),
position: self.position,
})
}
pub async fn skip_object(&mut self) -> Result<(), AsyncJsonStreamReaderError> {
let start_depth = self.depth;
self.start_object().await?;
while self.depth > start_depth {
self.next_token().await?;
}
self.state = ReaderState::Object;
Ok(())
}
pub async fn start_array_item(&mut self) -> Result<bool, AsyncJsonStreamReaderError> {
match self.next_token().await? {
Some(JsonToken::StartArray) => {
if let Some(JsonToken::EndArray) = self.peek_token().await? {
self.next_token().await?;
self.state = ReaderState::Object;
Ok(false)
} else {
Ok(true)
}
}
Some(JsonToken::EndObjectOrListItem) => Ok(true),
Some(JsonToken::EndArray) => {
self.state = ReaderState::Object;
Ok(false)
}
unexpected => Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "[ or , or ]",
found: format!("{unexpected:?}"),
}),
}
}
pub async fn start_object(&mut self) -> Result<(), AsyncJsonStreamReaderError> {
match self.next_token().await? {
Some(JsonToken::StartObject) => {
self.state = ReaderState::Object;
Ok(())
}
unexpected => Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "{",
found: format!("{unexpected:?}"),
}),
}
}
pub async fn read_string(&mut self) -> Result<String, AsyncJsonStreamReaderError> {
match self.next_token().await? {
Some(JsonToken::String(s)) => {
self.state = ReaderState::Object;
Ok(s)
}
unexpected => Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "string",
found: format!("{unexpected:?}"),
}),
}
}
pub async fn read_nullable_string(
&mut self,
) -> Result<Option<String>, AsyncJsonStreamReaderError> {
match self.next_token().await? {
Some(JsonToken::String(s)) => {
self.state = ReaderState::Object;
Ok(Some(s))
}
Some(JsonToken::Null) => {
self.state = ReaderState::Object;
Ok(None)
}
unexpected => Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "string",
found: format!("{unexpected:?}"),
}),
}
}
pub async fn read_number<T>(&mut self) -> Result<T, AsyncJsonStreamReaderError>
where
T: FromStr,
<T as FromStr>::Err: std::fmt::Debug,
{
match self.next_token().await? {
Some(JsonToken::Number(n)) => {
let res = n
.parse()
.map_err(|e| AsyncJsonStreamReaderError::JsonError {
error: format!("Can't parse number {n}: {e:#?}"),
position: self.position,
});
self.state = ReaderState::Object;
Ok(res?)
}
unexpected => Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "number",
found: format!("{unexpected:?}"),
}),
}
}
pub async fn read_boolean(&mut self) -> Result<bool, AsyncJsonStreamReaderError> {
match self.next_token().await? {
Some(JsonToken::Boolean(b)) => {
self.state = ReaderState::Object;
Ok(b)
}
unexpected => Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "boolean",
found: format!("{unexpected:?}"),
}),
}
}
pub async fn read_key(&mut self) -> Result<Option<String>, AsyncJsonStreamReaderError> {
loop {
match self.next_token().await? {
Some(JsonToken::Key(k)) => {
return Ok(Some(k));
}
Some(JsonToken::EndObject) => {
return Ok(None);
}
Some(JsonToken::EndObjectOrListItem) => {
continue;
}
unexpected => {
return Err(AsyncJsonStreamReaderError::UnexpectedToken {
expected: "key or end of object",
found: format!("{unexpected:?}"),
});
}
};
}
}
pub async fn deserialize_object(
&mut self,
) -> Result<Map<String, Value>, AsyncJsonStreamReaderError> {
let start_depth = self.depth;
let start_pos = self.position;
self.preserve_buffer = true;
self.start_object().await?;
while self.depth > start_depth {
self.next_token().await?;
}
let res = serde_json::from_slice(&self.buffer[start_pos..self.position]).map_err(|e| {
AsyncJsonStreamReaderError::JsonError {
error: format!("deserialize_object error: {e:?}"),
position: start_pos,
}
});
self.preserve_buffer = false;
self.state = ReaderState::Object;
res
}
}
#[cfg(test)]
mod tests {
use super::*;
use assert_matches::assert_matches;
use std::io::Cursor;
fn create_reader(json: &str) -> AsyncJsonStreamReader<Cursor<Vec<u8>>> {
AsyncJsonStreamReader::new(Cursor::new(json.as_bytes().to_vec()))
}
#[tokio::test]
async fn test_empty_json_object() {
let mut reader = create_reader("{}");
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::StartObject)
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::EndObject)
);
assert_eq!(reader.next_token().await.unwrap(), None);
}
#[tokio::test]
async fn test_empty_json_array() {
let mut reader = create_reader("[]");
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::StartArray)
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::EndArray)
);
assert_eq!(reader.next_token().await.unwrap(), None);
}
#[tokio::test]
async fn test_simple_key_value_pair() {
let mut reader = create_reader(r#"{"name": "test"}"#);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "name");
assert_eq!(reader.read_string().await.unwrap(), "test");
assert!(reader.next_object_entry().await.unwrap().is_none());
}
#[tokio::test]
async fn test_nested_objects() {
let mut reader = create_reader(r#"{"outer": {"inner": 42}}"#);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "outer");
let inner_obj = reader.deserialize_object().await.unwrap();
assert_eq!(inner_obj.get("inner").unwrap().as_i64().unwrap(), 42);
assert!(reader.next_object_entry().await.unwrap().is_none());
}
#[tokio::test]
async fn test_array_with_values() {
let mut reader = create_reader(r#"[1, "text", true, null]"#);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::StartArray)
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::Number("1".to_string()))
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::EndObjectOrListItem)
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::String("text".to_string()))
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::EndObjectOrListItem)
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::Boolean(true))
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::EndObjectOrListItem)
);
assert_eq!(reader.next_token().await.unwrap(), Some(JsonToken::Null));
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::EndArray)
);
assert_eq!(reader.next_token().await.unwrap(), None);
}
#[tokio::test]
async fn test_next_object_entry_skip_value() {
let mut reader = create_reader(r#"{"a": 1, "b": {"foo": "bar"}, "c": 3}"#);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "a");
assert_eq!(reader.read_number::<i32>().await.unwrap(), 1);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "b");
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "c");
assert_eq!(reader.read_number::<i32>().await.unwrap(), 3);
assert!(reader.next_object_entry().await.unwrap().is_none());
}
#[tokio::test]
async fn test_skip_value_resets_state_for_next_entry() {
let mut reader = create_reader(r#"{"a": {"x": 1}, "b": 2}"#);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "a");
reader.skip_value().await.unwrap();
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "b");
assert_eq!(reader.read_number::<i32>().await.unwrap(), 2);
}
#[tokio::test]
async fn test_peek_token_does_not_consume() {
let mut reader = create_reader(r#"{"a": 1}"#);
assert_eq!(
reader.peek_token().await.unwrap(),
Some(JsonToken::StartObject)
);
assert_eq!(
reader.next_token().await.unwrap(),
Some(JsonToken::StartObject)
);
}
#[tokio::test]
async fn test_skip_value_object() {
let mut reader = create_reader(r#"{"name": {"first": "John", "last": "Doe"}, "age": 30}"#);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "name");
reader.skip_object().await.unwrap();
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "age");
assert_eq!(reader.read_number::<i32>().await.unwrap(), 30);
}
#[tokio::test]
async fn test_read_methods() {
let mut reader = create_reader(
r#"{"str": "hello", "num": 42, "float": 1.337, "exp": -1.24e+10, "bool": true}"#,
);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "str");
assert_eq!(reader.read_string().await.unwrap(), "hello");
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "num");
assert_eq!(reader.read_number::<i64>().await.unwrap(), 42);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "float");
assert_eq!(reader.read_number::<f64>().await.unwrap(), 1.337_f64);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "exp");
assert_eq!(reader.read_number::<f64>().await.unwrap(), -1.24e+10);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "bool");
assert!(reader.read_boolean().await.unwrap());
assert!(reader.next_object_entry().await.unwrap().is_none());
}
#[tokio::test]
async fn test_read_nullable_string_null_resets_state() {
let mut reader = create_reader(r#"{"a": null, "b": "ok"}"#);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "a");
assert!(reader.read_nullable_string().await.unwrap().is_none());
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "b");
assert_eq!(reader.read_string().await.unwrap(), "ok");
assert!(reader.next_object_entry().await.unwrap().is_none());
}
#[tokio::test]
async fn test_skip_to_key_ignores_nested_keys() {
let mut reader = create_reader(r#"{"outer":{"id":1},"id":2,"tail":3}"#);
reader.skip_to_key("id").await.unwrap();
assert_eq!(reader.read_number::<i32>().await.unwrap(), 2);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "tail");
assert_eq!(reader.read_number::<i32>().await.unwrap(), 3);
}
#[tokio::test]
async fn test_skip_value_errors_on_unexpected_eof() {
let mut reader = create_reader(r#"{"a":{"b":1"#);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "a");
let err = reader.next_object_entry().await.unwrap_err();
match err {
AsyncJsonStreamReaderError::JsonError { error, .. } => {
assert!(error.contains("Unexpected EOF while skipping value"));
}
_ => panic!("Expected JsonError for unexpected EOF"),
}
}
#[tokio::test]
async fn test_fill_buffer_invariant_error() {
let mut reader = AsyncJsonStreamReader::new(Cursor::new(b"{}".to_vec()));
reader.buffer.extend_from_slice(b"{}");
reader.position = 0;
let err = reader.fill_buffer().await.unwrap_err();
match err {
AsyncJsonStreamReaderError::InternalState { message } => {
assert!(message.contains("unconsumed buffer"));
}
_ => panic!("Expected InternalState error"),
}
}
#[tokio::test]
async fn test_error_handling() {
let mut reader = create_reader(r#"{"missing_quote: 42}"#);
assert!(reader.next_token().await.is_ok());
assert!(reader.next_token().await.is_err());
}
#[tokio::test]
async fn test_user_example_scenario() {
let json = r#"{"status":"success","blah":1234,"results":[{"name":"John","age":30},{"name":"Jane","age":25}]}"#;
let mut reader = create_reader(json);
let mut status = None;
let mut results = Vec::new();
while let Some(key) = reader.next_object_entry().await.unwrap() {
match key.as_str() {
"status" => {
status = Some(reader.read_string().await.unwrap());
}
"results" => {
while reader.start_array_item().await.unwrap() {
let obj = reader.deserialize_object().await.unwrap();
results.push(obj);
}
}
"blah" => {
}
_ => panic!("Unexpected key"),
}
}
assert_eq!(status, Some("success".to_string()));
assert_eq!(results.len(), 2);
let john = &results[0];
assert_eq!(john.get("name").unwrap().as_str().unwrap(), "John");
assert_eq!(john.get("age").unwrap().as_i64().unwrap(), 30);
let jane = &results[1];
assert_eq!(jane.get("name").unwrap().as_str().unwrap(), "Jane");
assert_eq!(jane.get("age").unwrap().as_i64().unwrap(), 25);
}
#[tokio::test]
async fn test_empty_array_in_object() {
let json = r#"{"status":"success","data":[],"count":0}"#;
let mut reader = create_reader(json);
let mut status = None;
let mut data_items_count = 0;
let mut count = None;
while let Some(key) = reader.next_object_entry().await.unwrap() {
match key.as_str() {
"status" => {
status = Some(reader.read_string().await.unwrap());
}
"data" => {
while reader.start_array_item().await.unwrap() {
data_items_count += 1;
let _ = reader.deserialize_object().await.unwrap();
}
}
"count" => {
count = Some(reader.read_number::<i32>().await.unwrap());
}
_ => panic!("Unexpected key"),
}
}
assert_eq!(status, Some("success".to_string()));
assert_eq!(data_items_count, 0); assert_eq!(count, Some(0));
}
#[tokio::test]
async fn test_deserialize_large_object() {
let long_string = "a".repeat(INTERNAL_BUFFER_SIZE);
let key_string = "long_key";
let large_object_json =
format!(r#"{{"key": "value", "{}": "{}"}}"#, key_string, long_string);
let full_json = format!(r#"{{"results": [{}]}}"#, large_object_json);
let mut reader = create_reader(&full_json);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "results");
assert!(reader.start_array_item().await.unwrap());
let obj1 = reader.deserialize_object().await.unwrap();
assert_eq!(obj1.get("key").unwrap().as_str().unwrap(), "value");
assert_eq!(obj1.get(key_string).unwrap().as_str().unwrap(), long_string);
assert!(!reader.start_array_item().await.unwrap());
assert!(reader.next_object_entry().await.unwrap().is_none());
}
#[tokio::test]
async fn test_key_crosses_chunk_boundary() {
let part1 = b"{\"foo\"".to_vec();
let part2 = b": \"bar\"}".to_vec();
let reader_p1 = Cursor::new(part1);
let reader_p2 = Cursor::new(part2);
let chained_reader = tokio::io::AsyncReadExt::chain(reader_p1, reader_p2);
let mut reader = AsyncJsonStreamReader::new(chained_reader);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "foo");
let value = reader.read_string().await.unwrap();
assert_eq!(value, "bar");
assert!(reader.next_object_entry().await.unwrap().is_none());
}
use serde::Deserialize;
use std::time::Instant;
fn generate_large_json(num_entries: usize) -> String {
let mut results = Vec::with_capacity(num_entries);
for i in 0..num_entries {
results.push(format!(
r#"{{"id": {}, "name": "name_{}", "extra_data": "some_long_string_to_ignore_{}", "value": {}, "is_active": {}}}"#,
i,
i,
"x".repeat(100), i as f64 * 1.1,
if i % 2 == 0 { "true" } else { "false" }
));
}
format!(
r#"{{"status":"success","useless_data": "{}","results":[{}]}}"#,
"y".repeat(1024), results.join(",")
)
}
#[tokio::test]
#[ignore]
async fn performance_test_very_large_json_selective_parsing() {
const NUM_ENTRIES: usize = 500_000; let json_data = generate_large_json(NUM_ENTRIES);
let json_bytes = json_data.as_bytes().to_vec();
println!("\n--- Performance Test: Selective Parsing (500k entries, ~112MB) ---");
println!(
"Parsing {} entries from a JSON of size {} bytes.",
NUM_ENTRIES,
json_data.len()
);
let start_async = Instant::now();
let mut reader = create_reader(&json_data);
let mut found_ids = Vec::with_capacity(NUM_ENTRIES);
while let Some(key) = reader.next_object_entry().await.unwrap() {
if key == "results" {
while reader.start_array_item().await.unwrap() {
reader.start_object().await.unwrap();
let mut id = None;
while let Some(inner_key) = reader.read_key().await.unwrap() {
if inner_key == "id" {
id = Some(reader.read_number::<u64>().await.unwrap());
} else {
reader.skip_value().await.unwrap();
}
}
if let Some(id_val) = id {
found_ids.push(id_val);
}
}
}
}
let duration_async = start_async.elapsed();
assert_eq!(found_ids.len(), NUM_ENTRIES);
println!(
"1. AsyncJsonStreamReader (manual parsing): {:?}",
duration_async
);
#[derive(Deserialize)]
struct ResultEntry {
id: u64,
}
#[derive(Deserialize)]
struct Response {
results: Vec<ResultEntry>,
}
let start_serde_struct = Instant::now();
let parsed: Response = serde_json::from_slice(&json_bytes).unwrap();
let serde_ids: Vec<u64> = parsed.results.into_iter().map(|r| r.id).collect();
let duration_serde_struct = start_serde_struct.elapsed();
assert_eq!(serde_ids.len(), NUM_ENTRIES);
println!(
"2. serde_json (selective struct): {:?}",
duration_serde_struct
);
let start_serde_value = Instant::now();
let parsed_value: serde_json::Value = serde_json::from_slice(&json_bytes).unwrap();
let result_ids: Vec<u64> = parsed_value["results"]
.as_array()
.unwrap()
.iter()
.map(|v| v["id"].as_u64().unwrap())
.collect();
let duration_serde_value = start_serde_value.elapsed();
assert_eq!(result_ids.len(), NUM_ENTRIES);
println!(
"3. serde_json (generic Value): {:?}",
duration_serde_value
);
}
#[tokio::test]
async fn test_string_with_escapes() {
let json = r#"{"key": "hello\nworld\t\"escaped quote\\final backslash\u0041BC"}"#;
let mut reader = create_reader(json);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "key");
let expected_string = "hello\nworld\t\"escaped quote\\final backslashABC";
let parsed_string = reader.read_string().await.unwrap();
assert_eq!(parsed_string, expected_string);
}
#[tokio::test]
async fn test_complex_json_correctness_vs_serde() {
let complex_json = r#"
{
"string_key": "value",
"int_key": 123,
"float_key": -45.67,
"scientific_notation": 1.23e-4,
"bool_true": true,
"bool_false": false,
"null_key": null,
"empty_string": "",
"whitespace_string": " \n\t ",
"escaped_string": "line1\nline2\tline3\"quote\\slash/",
"unicode_string": "Alpha: \u03b1, Smiley: \uD83D\uDE00",
"empty_object": {},
"empty_array": [],
"nested_object": {
"a": 1,
"b": "two",
"c": [true, false, null]
},
"array_of_objects": [
{"id": 1, "tags": ["A", "B"]},
{"id": 2, "tags": ["C"]},
{}
],
"array_with_mixed_types": [
"string",
100,
-1.1,
true,
null,
{},
[]
],
"key with spaces and symbols!@#$%^&*()": "value for complex key"
}
"#;
let mut reader = create_reader(complex_json);
let our_result = reader.deserialize_object().await.unwrap();
let serde_result: Value = serde_json::from_str(complex_json).unwrap();
let serde_map = serde_result.as_object().unwrap();
assert_eq!(our_result, *serde_map);
}
#[tokio::test]
async fn test_deeply_nested_object() {
let mut json = "{\"key\": ".to_string();
let depth = 50;
for _ in 0..depth {
json.push_str("{\"a\":");
}
json.push_str("42");
for _ in 0..depth {
json.push('}');
}
json.push('}');
let mut reader = create_reader(&json);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "key");
let obj = reader.deserialize_object().await.unwrap();
let mut current = obj.get("a").unwrap();
for _ in 0..depth - 1 {
current = current.as_object().unwrap().get("a").unwrap();
}
assert_eq!(current.as_i64().unwrap(), 42);
assert!(reader.next_object_entry().await.unwrap().is_none());
}
#[tokio::test]
async fn test_invalid_unicode_hex_error() {
let json = r#"{"key": "\u123G"}"#; let mut reader = create_reader(json);
let key = reader.next_object_entry().await.unwrap().unwrap();
assert_eq!(key, "key");
let result = reader.read_string().await;
assert_matches!(
result,
Err(AsyncJsonStreamReaderError::JsonError { error, .. }) if error.contains("Invalid hex value in unicode escape sequence")
);
}
#[tokio::test]
async fn test_number_parsing() {
async fn expect_number(json: &str, expected_num_str: &str) {
let mut reader = create_reader(json);
assert_matches!(
reader.next_token().await.unwrap(),
Some(JsonToken::Number(s)) if s == expected_num_str
);
assert_matches!(reader.next_token().await.unwrap(), None);
}
async fn expect_error_on_first_token(json: &str, expected_error_part: &str) {
let mut reader = create_reader(json);
let result = reader.next_token().await;
assert_matches!(
result,
Err(e) if format!("{:?}", e).contains(expected_error_part)
);
}
expect_number("0", "0").await;
expect_number("123", "123").await;
expect_number("-123", "-123").await;
expect_number("-0", "-0").await;
expect_number("0.123", "0.123").await;
expect_number("-0.123", "-0.123").await;
expect_number("123.456", "123.456").await;
expect_number("1e10", "1e10").await;
expect_number("1E10", "1E10").await;
expect_number("1e+10", "1e+10").await;
expect_number("1e-10", "1e-10").await;
expect_number("1.23e+10", "1.23e+10").await;
expect_number("-1.23e-10", "-1.23e-10").await;
expect_number("0e0", "0e0").await;
expect_error_on_first_token("01", "leading zeros are not allowed").await;
expect_error_on_first_token("-01", "leading zeros are not allowed").await;
expect_error_on_first_token("1.", "unexpected EOF after decimal point").await;
expect_error_on_first_token("1.e10", "Invalid character after decimal point").await;
expect_error_on_first_token("1e", "unexpected EOF after exponent").await;
expect_error_on_first_token("1E+", "unexpected EOF after exponent").await;
expect_error_on_first_token("-", "InvalidNumber").await;
expect_error_on_first_token("--1", "InvalidNumber").await; }
#[tokio::test]
async fn test_invalid_number_sequence() {
let mut reader = create_reader("1.2.3");
assert_matches!(reader.next_token().await.unwrap(), Some(JsonToken::Number(s)) if s == "1.2");
assert_matches!(reader.next_token().await, Err(AsyncJsonStreamReaderError::JsonError { error, .. }) if error.contains("Unexpected JSON character: ."));
let mut reader = create_reader("1a");
assert_matches!(reader.next_token().await.unwrap(), Some(JsonToken::Number(s)) if s == "1");
assert_matches!(reader.next_token().await, Err(AsyncJsonStreamReaderError::JsonError { error, .. }) if error.contains("Unexpected JSON character: a"));
}
#[tokio::test]
async fn test_number_crosses_chunk_boundary() {
async fn check_number_boundary(part1: &str, part2: &str, expected: &str) {
let reader_p1 = Cursor::new(part1.as_bytes().to_vec());
let reader_p2 = Cursor::new(part2.as_bytes().to_vec());
let chained_reader = tokio::io::AsyncReadExt::chain(reader_p1, reader_p2);
let mut reader = AsyncJsonStreamReader::new(chained_reader);
assert_matches!(
reader.next_token().await.unwrap(),
Some(JsonToken::Number(s)) if s == expected
);
}
check_number_boundary("-", "123", "-123").await;
check_number_boundary("123", ".456", "123.456").await;
check_number_boundary("123.", "456", "123.456").await;
check_number_boundary("123.456", "e10", "123.456e10").await;
check_number_boundary("123.456e", "+10", "123.456e+10").await;
check_number_boundary("123.456e+", "10", "123.456e+10").await;
}
}