use std::collections::HashMap;
use jiter::{Jiter, JiterErrorType, JsonErrorType, NumberAny, NumberInt, Peek};
use pjson_rs_domain::value_objects::JsonData;
use crate::Result;
use crate::error::Error;
use crate::stream::priority::JsonPath;
mod private {
pub trait Sealed {}
}
pub trait PartialJsonParser: private::Sealed {
fn parse_partial(&self, input: &[u8]) -> Result<PartialParseResult>;
}
#[derive(Debug, Clone)]
pub struct PartialParseResult {
pub value: JsonData,
pub consumed: usize,
pub is_complete: bool,
pub streaming_hint: Option<StreamingHint>,
pub diagnostics: Vec<ParseDiagnostic>,
}
impl PartialParseResult {
fn empty() -> Self {
Self {
value: JsonData::Null,
consumed: 0,
is_complete: false,
streaming_hint: None,
diagnostics: vec![],
}
}
}
#[derive(Debug, Clone, PartialEq)]
pub enum ParseDiagnostic {
DuplicateKey {
path: JsonPath,
key: String,
},
BigIntLossyConversion {
path: JsonPath,
original: String,
converted: f64,
},
}
#[derive(Debug, Clone)]
pub struct StreamingHint {
pub stable_paths: Vec<JsonPath>,
pub tentative_paths: Vec<JsonPath>,
}
#[derive(Debug, Clone)]
pub struct JiterConfig {
pub max_input_size: usize,
pub max_depth: usize,
pub emit_streaming_hint: bool,
pub allow_trailing_strings: bool,
pub allow_inf_nan: bool,
}
impl Default for JiterConfig {
fn default() -> Self {
Self {
max_input_size: 100 * 1024 * 1024,
max_depth: 64,
emit_streaming_hint: true,
allow_trailing_strings: true,
allow_inf_nan: false,
}
}
}
#[derive(Debug, Clone)]
pub struct JiterPartialParser {
config: JiterConfig,
}
impl private::Sealed for JiterPartialParser {}
impl JiterPartialParser {
pub fn new(config: JiterConfig) -> Self {
Self { config }
}
}
impl Default for JiterPartialParser {
fn default() -> Self {
Self::new(JiterConfig::default())
}
}
impl PartialJsonParser for JiterPartialParser {
fn parse_partial(&self, input: &[u8]) -> Result<PartialParseResult> {
if input.len() > self.config.max_input_size {
return Err(Error::Buffer(format!(
"input length {} exceeds max_input_size {}",
input.len(),
self.config.max_input_size
)));
}
if input.iter().all(|b| b.is_ascii_whitespace()) {
return Ok(PartialParseResult::empty());
}
let mut jiter = Jiter::new(input);
if self.config.allow_inf_nan {
jiter = jiter.with_allow_inf_nan();
}
if self.config.allow_trailing_strings {
jiter = jiter.with_allow_partial_strings();
}
let path = JsonPath::root();
let outcome = walk(&mut jiter, 0, &path, &self.config);
match outcome {
WalkOutcome::Complete {
value,
cursor,
diagnostics,
} => {
let consumed = cursor + count_trailing_whitespace(input, cursor);
let is_complete = consumed >= input.len();
let hint = if self.config.emit_streaming_hint {
build_streaming_hint(&value, &path, false)
} else {
None
};
Ok(PartialParseResult {
value,
consumed,
is_complete,
streaming_hint: hint,
diagnostics,
})
}
WalkOutcome::Truncated {
partial,
cursor,
diagnostics,
} => {
let hint = if self.config.emit_streaming_hint {
build_streaming_hint(&partial, &path, true)
} else {
None
};
Ok(PartialParseResult {
value: partial,
consumed: cursor,
is_complete: false,
streaming_hint: hint,
diagnostics,
})
}
WalkOutcome::Hard(e) => Err(Error::invalid_json(e.index, e.error_type.to_string())),
}
}
}
enum WalkOutcome {
Complete {
value: JsonData,
cursor: usize,
diagnostics: Vec<ParseDiagnostic>,
},
Truncated {
partial: JsonData,
cursor: usize,
diagnostics: Vec<ParseDiagnostic>,
},
Hard(jiter::JiterError),
}
fn is_partial_tolerated(e: &jiter::JiterError) -> bool {
matches!(
e.error_type,
JiterErrorType::JsonError(
JsonErrorType::EofWhileParsingList
| JsonErrorType::EofWhileParsingObject
| JsonErrorType::EofWhileParsingString
| JsonErrorType::EofWhileParsingValue
| JsonErrorType::ExpectedListCommaOrEnd
| JsonErrorType::ExpectedObjectCommaOrEnd
)
)
}
fn is_eof_value(e: &jiter::JiterError) -> bool {
matches!(
e.error_type,
JiterErrorType::JsonError(JsonErrorType::EofWhileParsingValue)
)
}
fn count_trailing_whitespace(input: &[u8], from: usize) -> usize {
input[from..]
.iter()
.take_while(|b| b.is_ascii_whitespace())
.count()
}
fn walk(jiter: &mut Jiter<'_>, depth: usize, path: &JsonPath, config: &JiterConfig) -> WalkOutcome {
if depth >= config.max_depth {
return WalkOutcome::Hard(jiter::JiterError {
error_type: JiterErrorType::JsonError(JsonErrorType::RecursionLimitExceeded),
index: jiter.current_index(),
});
}
let pre = jiter.current_index();
let peek = match jiter.peek() {
Ok(p) => p,
Err(e) if is_eof_value(&e) => {
return WalkOutcome::Truncated {
partial: JsonData::Null,
cursor: pre,
diagnostics: vec![],
};
}
Err(e) => return WalkOutcome::Hard(e),
};
match peek {
Peek::Null => match jiter.known_null() {
Ok(()) => WalkOutcome::Complete {
value: JsonData::Null,
cursor: jiter.current_index(),
diagnostics: vec![],
},
Err(e) if is_partial_tolerated(&e) => WalkOutcome::Truncated {
partial: JsonData::Null,
cursor: pre,
diagnostics: vec![],
},
Err(e) => WalkOutcome::Hard(e),
},
Peek::True | Peek::False => match jiter.known_bool(peek) {
Ok(b) => WalkOutcome::Complete {
value: JsonData::Bool(b),
cursor: jiter.current_index(),
diagnostics: vec![],
},
Err(e) if is_partial_tolerated(&e) => WalkOutcome::Truncated {
partial: JsonData::Null,
cursor: pre,
diagnostics: vec![],
},
Err(e) => WalkOutcome::Hard(e),
},
Peek::String => walk_string(jiter, pre, config),
p if p.is_num() => walk_number(jiter, peek, pre, path),
Peek::Array => {
let pre_open = jiter.current_index();
walk_array(jiter, depth + 1, path, config, pre_open)
}
Peek::Object => {
let pre_open = jiter.current_index();
walk_object(jiter, depth + 1, path, config, pre_open)
}
_ => {
WalkOutcome::Hard(jiter::JiterError {
error_type: JiterErrorType::JsonError(JsonErrorType::ExpectedSomeValue),
index: pre,
})
}
}
}
fn walk_string(jiter: &mut Jiter<'_>, pre_open: usize, _config: &JiterConfig) -> WalkOutcome {
match jiter.known_str() {
Ok(s) => {
let owned = s.to_owned();
WalkOutcome::Complete {
value: JsonData::String(owned),
cursor: jiter.current_index(),
diagnostics: vec![],
}
}
Err(e)
if matches!(
e.error_type,
JiterErrorType::JsonError(JsonErrorType::EofWhileParsingString)
) =>
{
WalkOutcome::Truncated {
partial: JsonData::Null,
cursor: pre_open,
diagnostics: vec![],
}
}
Err(e) => WalkOutcome::Hard(e),
}
}
fn walk_number(jiter: &mut Jiter<'_>, peek: Peek, pre: usize, path: &JsonPath) -> WalkOutcome {
match jiter.known_number(peek) {
Ok(NumberAny::Int(NumberInt::Int(i))) => WalkOutcome::Complete {
value: JsonData::Integer(i),
cursor: jiter.current_index(),
diagnostics: vec![],
},
Ok(NumberAny::Int(NumberInt::BigInt(b))) => {
let original = b.to_string();
let converted: f64 = original
.parse()
.expect("BigInt produces parseable decimal for f64::from_str");
WalkOutcome::Complete {
value: JsonData::Float(converted),
cursor: jiter.current_index(),
diagnostics: vec![ParseDiagnostic::BigIntLossyConversion {
path: path.clone(),
original,
converted,
}],
}
}
Ok(NumberAny::Float(f)) => WalkOutcome::Complete {
value: JsonData::Float(f),
cursor: jiter.current_index(),
diagnostics: vec![],
},
Err(e)
if matches!(
e.error_type,
JiterErrorType::JsonError(JsonErrorType::EofWhileParsingValue)
) =>
{
WalkOutcome::Truncated {
partial: JsonData::Null,
cursor: pre,
diagnostics: vec![],
}
}
Err(e)
if matches!(
e.error_type,
JiterErrorType::JsonError(JsonErrorType::InvalidNumber)
) =>
{
WalkOutcome::Hard(e)
}
Err(e) => WalkOutcome::Hard(e),
}
}
fn walk_array(
jiter: &mut Jiter<'_>,
depth: usize,
path: &JsonPath,
config: &JiterConfig,
pre_open: usize,
) -> WalkOutcome {
let first_peek = match jiter.known_array() {
Ok(Some(p)) => p,
Ok(None) => {
return WalkOutcome::Complete {
value: JsonData::Array(vec![]),
cursor: jiter.current_index(),
diagnostics: vec![],
};
}
Err(e) if is_partial_tolerated(&e) => {
return WalkOutcome::Truncated {
partial: JsonData::Array(vec![]),
cursor: pre_open,
diagnostics: vec![],
};
}
Err(e) => return WalkOutcome::Hard(e),
};
let mut items: Vec<JsonData> = Vec::new();
#[allow(unused_assignments)]
let mut cursor_after_last_complete = pre_open;
let mut next_peek = Some(first_peek);
let mut all_diagnostics: Vec<ParseDiagnostic> = vec![];
loop {
let peek = next_peek.take().expect("loop invariant: always set at top");
let child_path = path.append_index(items.len());
match walk_with_peek(jiter, peek, depth, &child_path, config) {
WalkOutcome::Complete {
value,
cursor,
diagnostics,
} => {
items.push(value);
cursor_after_last_complete = cursor;
all_diagnostics.extend(diagnostics);
}
WalkOutcome::Truncated {
partial,
cursor,
diagnostics,
} => {
items.push(partial);
all_diagnostics.extend(diagnostics);
return WalkOutcome::Truncated {
partial: JsonData::Array(items),
cursor,
diagnostics: all_diagnostics,
};
}
WalkOutcome::Hard(e) => return WalkOutcome::Hard(e),
}
match jiter.array_step() {
Ok(Some(p)) => {
next_peek = Some(p);
}
Ok(None) => {
return WalkOutcome::Complete {
value: JsonData::Array(items),
cursor: jiter.current_index(),
diagnostics: all_diagnostics,
};
}
Err(e) if is_partial_tolerated(&e) => {
return WalkOutcome::Truncated {
partial: JsonData::Array(items),
cursor: cursor_after_last_complete,
diagnostics: all_diagnostics,
};
}
Err(e) => return WalkOutcome::Hard(e),
}
}
}
fn walk_object(
jiter: &mut Jiter<'_>,
depth: usize,
path: &JsonPath,
config: &JiterConfig,
pre_open: usize,
) -> WalkOutcome {
let first_key: Option<String> = match jiter.known_object() {
Ok(Some(k)) => Some(k.to_owned()), Ok(None) => {
return WalkOutcome::Complete {
value: JsonData::Object(HashMap::new()),
cursor: jiter.current_index(),
diagnostics: vec![],
};
}
Err(e) if is_partial_tolerated(&e) => {
return WalkOutcome::Truncated {
partial: JsonData::Object(HashMap::new()),
cursor: pre_open,
diagnostics: vec![],
};
}
Err(e) => return WalkOutcome::Hard(e),
};
let mut map: HashMap<String, JsonData> = HashMap::new();
let mut cursor_after_last_complete = pre_open;
let mut next_key: Option<String> = first_key;
let mut all_diagnostics: Vec<ParseDiagnostic> = vec![];
loop {
let key = next_key.take().expect("loop invariant: always set at top");
let child_path = path.append_key(&key);
let peek = match jiter.peek() {
Ok(p) => p,
Err(e) if is_partial_tolerated(&e) => {
return WalkOutcome::Truncated {
partial: JsonData::Object(map),
cursor: cursor_after_last_complete,
diagnostics: all_diagnostics,
};
}
Err(e) => return WalkOutcome::Hard(e),
};
match walk_with_peek(jiter, peek, depth, &child_path, config) {
WalkOutcome::Complete {
value,
cursor,
diagnostics,
} => {
if map.contains_key(&key) {
all_diagnostics.push(ParseDiagnostic::DuplicateKey {
path: path.clone(),
key: key.clone(),
});
}
map.insert(key, value);
cursor_after_last_complete = cursor;
all_diagnostics.extend(diagnostics);
}
WalkOutcome::Truncated {
partial,
cursor,
diagnostics,
} => {
if map.contains_key(&key) {
all_diagnostics.push(ParseDiagnostic::DuplicateKey {
path: path.clone(),
key: key.clone(),
});
}
map.insert(key, partial);
all_diagnostics.extend(diagnostics);
return WalkOutcome::Truncated {
partial: JsonData::Object(map),
cursor,
diagnostics: all_diagnostics,
};
}
WalkOutcome::Hard(e) => return WalkOutcome::Hard(e),
}
match jiter.next_key() {
Ok(Some(k)) => {
let owned_key = k.to_owned(); next_key = Some(owned_key);
}
Ok(None) => {
return WalkOutcome::Complete {
value: JsonData::Object(map),
cursor: jiter.current_index(),
diagnostics: all_diagnostics,
};
}
Err(e) if is_partial_tolerated(&e) => {
return WalkOutcome::Truncated {
partial: JsonData::Object(map),
cursor: cursor_after_last_complete,
diagnostics: all_diagnostics,
};
}
Err(e) => return WalkOutcome::Hard(e),
}
}
}
fn walk_with_peek(
jiter: &mut Jiter<'_>,
peek: Peek,
depth: usize,
path: &JsonPath,
config: &JiterConfig,
) -> WalkOutcome {
if depth >= config.max_depth {
return WalkOutcome::Hard(jiter::JiterError {
error_type: JiterErrorType::JsonError(JsonErrorType::RecursionLimitExceeded),
index: jiter.current_index(),
});
}
let pre = jiter.current_index();
match peek {
Peek::Null => match jiter.known_null() {
Ok(()) => WalkOutcome::Complete {
value: JsonData::Null,
cursor: jiter.current_index(),
diagnostics: vec![],
},
Err(e) if is_partial_tolerated(&e) => WalkOutcome::Truncated {
partial: JsonData::Null,
cursor: pre,
diagnostics: vec![],
},
Err(e) => WalkOutcome::Hard(e),
},
Peek::True | Peek::False => match jiter.known_bool(peek) {
Ok(b) => WalkOutcome::Complete {
value: JsonData::Bool(b),
cursor: jiter.current_index(),
diagnostics: vec![],
},
Err(e) if is_partial_tolerated(&e) => WalkOutcome::Truncated {
partial: JsonData::Null,
cursor: pre,
diagnostics: vec![],
},
Err(e) => WalkOutcome::Hard(e),
},
Peek::String => walk_string(jiter, pre, config),
p if p.is_num() => walk_number(jiter, peek, pre, path),
Peek::Array => {
let pre_open = jiter.current_index();
walk_array(jiter, depth + 1, path, config, pre_open)
}
Peek::Object => {
let pre_open = jiter.current_index();
walk_object(jiter, depth + 1, path, config, pre_open)
}
_ => WalkOutcome::Hard(jiter::JiterError {
error_type: JiterErrorType::JsonError(JsonErrorType::ExpectedSomeValue),
index: pre,
}),
}
}
fn build_streaming_hint(
value: &JsonData,
root: &JsonPath,
is_truncated: bool,
) -> Option<StreamingHint> {
match value {
JsonData::Object(_) | JsonData::Array(_) => {
let mut all_paths: Vec<JsonPath> = vec![];
collect_leaves(value, root, &mut all_paths);
let mut hint = StreamingHint {
stable_paths: vec![],
tentative_paths: vec![],
};
if is_truncated && !all_paths.is_empty() {
let tentative = all_paths.pop().expect("non-empty vec");
hint.stable_paths = all_paths;
hint.tentative_paths = vec![tentative];
} else {
hint.stable_paths = all_paths;
}
Some(hint)
}
_ => None,
}
}
fn collect_leaves(value: &JsonData, path: &JsonPath, out: &mut Vec<JsonPath>) {
match value {
JsonData::Object(map) => {
for (key, child) in map {
let child_path = path.append_key(key);
collect_leaves(child, &child_path, out);
}
}
JsonData::Array(items) => {
for (idx, child) in items.iter().enumerate() {
let child_path = path.append_index(idx);
collect_leaves(child, &child_path, out);
}
}
_ => {
out.push(path.clone());
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn parser() -> JiterPartialParser {
JiterPartialParser::default()
}
#[test]
fn test_empty_input_returns_consumed_zero() {
let r = parser().parse_partial(b"").unwrap();
assert_eq!(r.consumed, 0);
assert!(!r.is_complete);
assert!(r.diagnostics.is_empty());
assert!(matches!(r.value, JsonData::Null));
}
#[test]
fn test_whitespace_only_returns_consumed_zero() {
let r = parser().parse_partial(b" \n\t").unwrap();
assert_eq!(r.consumed, 0);
assert!(!r.is_complete);
}
#[test]
fn test_lone_open_bracket_consumed_zero() {
let r = parser().parse_partial(b"[").unwrap();
assert_eq!(r.consumed, 0, "open array must not advance consumed");
assert!(!r.is_complete);
assert!(matches!(r.value, JsonData::Array(ref v) if v.is_empty()));
}
#[test]
fn test_lone_open_brace_consumed_zero() {
let r = parser().parse_partial(b"{").unwrap();
assert_eq!(r.consumed, 0, "open object must not advance consumed");
assert!(!r.is_complete);
assert!(matches!(r.value, JsonData::Object(ref m) if m.is_empty()));
}
#[test]
fn test_complete_null() {
let r = parser().parse_partial(b"null").unwrap();
assert!(r.is_complete);
assert_eq!(r.consumed, 4);
assert!(matches!(r.value, JsonData::Null));
}
#[test]
fn test_complete_int_with_trailing_whitespace() {
let r = parser().parse_partial(b"42 ").unwrap();
assert!(r.is_complete);
assert_eq!(r.consumed, 3);
assert!(matches!(r.value, JsonData::Integer(42)));
}
#[test]
fn test_oversized_input_returns_error() {
let config = JiterConfig {
max_input_size: 4,
..Default::default()
};
let p = JiterPartialParser::new(config);
let result = p.parse_partial(b"12345");
assert!(result.is_err());
}
#[test]
fn test_complete_json_is_complete_true() {
let r = parser().parse_partial(br#"{"a":1,"b":true}"#).unwrap();
assert!(r.is_complete);
assert!(r.diagnostics.is_empty());
if let JsonData::Object(map) = &r.value {
assert_eq!(map.get("a"), Some(&JsonData::Integer(1)));
assert_eq!(map.get("b"), Some(&JsonData::Bool(true)));
} else {
panic!("expected Object");
}
}
#[test]
fn test_complete_array() {
let r = parser().parse_partial(b"[1,2,3]").unwrap();
assert!(r.is_complete);
assert!(matches!(
r.value,
JsonData::Array(ref v) if v.len() == 3
));
}
#[test]
fn test_truncated_object_partial_recovered() {
let r = parser().parse_partial(b"{\"key\": \"val").unwrap();
assert!(!r.is_complete);
}
#[test]
fn test_truncated_array_partial_recovered() {
let r = parser().parse_partial(b"[1,2,").unwrap();
assert!(!r.is_complete);
if let JsonData::Array(items) = &r.value {
assert!(
items.len() >= 2,
"expected at least 2 items, got {:?}",
items
);
}
}
#[test]
fn test_boundary_a_mid_hex_escape_tolerated_as_partial_string() {
let result = parser().parse_partial(b"\"\\u00");
assert!(
result.is_ok(),
"mid-hex-escape is tolerated by jiter 0.14 partial-string mode"
);
let r = result.unwrap();
assert!(matches!(r.value, JsonData::String(_)));
assert_eq!(r.consumed, 5, "all 5 bytes consumed");
assert!(r.is_complete, "consumed == input.len() so is_complete");
}
#[test]
fn test_boundary_b_truncated_number_tolerated() {
let result = parser().parse_partial(b"1.");
assert!(result.is_ok(), "jiter tolerates `1.` as truncation");
let r = result.unwrap();
assert_eq!(r.consumed, 0, "number dropped: nothing committed");
assert!(!r.is_complete);
}
#[test]
fn test_boundary_c_mid_key_drops_key() {
let r = parser().parse_partial(b"{\"ke").unwrap();
assert!(!r.is_complete);
assert!(matches!(r.value, JsonData::Object(_)));
}
#[test]
fn test_boundary_d_bare_minus_tolerated() {
let r = parser().parse_partial(b"-").unwrap();
assert!(!r.is_complete);
assert_eq!(r.consumed, 0);
}
#[test]
fn test_boundary_e_partial_keyword_tolerated_as_truncation() {
let result = parser().parse_partial(b"tru");
assert!(
result.is_ok(),
"partial keyword `tru` is tolerated as truncation in jiter 0.14"
);
let r = result.unwrap();
assert!(!r.is_complete);
assert_eq!(r.consumed, 0);
}
#[test]
fn test_duplicate_key_last_write_wins() {
let r = parser().parse_partial(b"{\"x\":1,\"x\":2}").unwrap();
assert!(r.is_complete);
if let JsonData::Object(map) = &r.value {
assert_eq!(
map.get("x"),
Some(&JsonData::Integer(2)),
"last-write-wins: x should be 2"
);
} else {
panic!("expected Object");
}
let dup = r
.diagnostics
.iter()
.find(|d| matches!(d, ParseDiagnostic::DuplicateKey { key, .. } if key == "x"));
assert!(dup.is_some(), "DuplicateKey diagnostic must be emitted");
}
#[test]
fn test_nested_truncation_is_not_complete() {
let r = parser().parse_partial(b"{\"a\":{\"b\":1").unwrap();
assert!(!r.is_complete);
}
#[test]
fn test_streaming_hint_populated_for_object() {
let r = parser().parse_partial(br#"{"a":1}"#).unwrap();
assert!(r.is_complete);
let hint = r.streaming_hint.expect("hint must be Some for object");
assert!(
!hint.stable_paths.is_empty(),
"complete object must have stable paths"
);
assert!(
hint.tentative_paths.is_empty(),
"complete parse must have no tentative paths"
);
assert!(
hint.stable_paths
.iter()
.any(|p| p.last_key().as_deref() == Some("a")),
"stable_paths should contain a path ending with key 'a'"
);
}
#[test]
fn test_streaming_hint_none_for_scalar() {
let r = parser().parse_partial(b"42").unwrap();
assert!(r.streaming_hint.is_none());
}
#[test]
fn test_streaming_hint_truncated_string_at_eof_is_tentative() {
let r = parser().parse_partial(b"{\"a\":\"hello").unwrap();
assert!(!r.is_complete);
let hint = r
.streaming_hint
.expect("hint must be Some for truncated object");
assert_eq!(
hint.tentative_paths.len(),
1,
"exactly one tentative path for truncated string leaf"
);
}
#[test]
fn test_streaming_hint_complete_object_all_stable() {
let r = parser().parse_partial(br#"{"x":1,"y":2}"#).unwrap();
assert!(r.is_complete);
let hint = r
.streaming_hint
.expect("hint must be Some for complete object");
assert_eq!(
hint.tentative_paths.len(),
0,
"no tentative paths for complete object"
);
assert_eq!(
hint.stable_paths.len(),
2,
"two stable leaf paths for two-field object"
);
}
}