use crate::JsonStreamEvent;
use crate::decode::decoders as decoder_impl;
use crate::decode::parser::{
is_array_header_content, is_key_value_content, parse_array_header_line, parse_key_token,
parse_primitive_token,
};
use crate::decode::scanner::{
Depth, ParsedLine, StreamingScanState, create_scan_state, parse_line_incremental,
};
use crate::error::{Result, ToonError};
use crate::options::DecodeStreamOptions;
use crate::shared::constants::{COLON, DEFAULT_DELIMITER, LIST_ITEM_PREFIX};
use crate::shared::string_utils::find_closing_quote;
use asupersync::stream::{Stream, StreamExt, iter};
use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum DecoderState {
Initial,
SimpleObject { base_depth: Depth },
ArrayMode,
Finished,
}
type ObjectDepth = Depth;
pub struct AsyncDecodeStream<I: Iterator<Item = String>> {
lines: I,
options: DecodeStreamOptions,
scan_state: StreamingScanState,
event_queue: VecDeque<JsonStreamEvent>,
state: DecoderState,
context_stack: Vec<ObjectDepth>,
line_buffer: Vec<ParsedLine>,
lines_exhausted: bool,
error: Option<ToonError>,
last_depth: Option<Depth>,
}
impl<I: Iterator<Item = String>> AsyncDecodeStream<I> {
pub fn new(lines: I, options: Option<DecodeStreamOptions>) -> Self {
let options = options.unwrap_or_default();
Self {
lines,
options,
scan_state: create_scan_state(),
event_queue: VecDeque::new(),
state: DecoderState::Initial,
context_stack: Vec::new(),
line_buffer: Vec::new(),
lines_exhausted: false,
error: None,
last_depth: None,
}
}
fn indent_size(&self) -> usize {
self.options.indent.unwrap_or(2)
}
fn strict(&self) -> bool {
self.options.strict.unwrap_or(true)
}
fn process_next(&mut self) -> Result<Option<JsonStreamEvent>> {
if let Some(event) = self.event_queue.pop_front() {
return Ok(Some(event));
}
if let Some(err) = self.error.take() {
return Err(err);
}
if self.state == DecoderState::Finished {
return Ok(None);
}
if self.lines_exhausted {
return self.finalize();
}
let Some(raw_line) = self.lines.next() else {
self.lines_exhausted = true;
return self.finalize();
};
let indent_size = self.indent_size();
let strict = self.strict();
let parsed = parse_line_incremental(&raw_line, &mut self.scan_state, indent_size, strict)?;
let Some(line) = parsed else {
return Ok(None);
};
match self.state {
DecoderState::Initial => self.process_initial_line(line),
DecoderState::SimpleObject { base_depth } => {
self.process_simple_object_line(line, base_depth)
}
DecoderState::ArrayMode => {
self.line_buffer.push(line);
Ok(None)
}
DecoderState::Finished => Ok(None),
}
}
fn process_initial_line(&mut self, line: ParsedLine) -> Result<Option<JsonStreamEvent>> {
if is_array_header_content(&line.content)
&& parse_array_header_line(&line.content, DEFAULT_DELIMITER)?.is_some()
{
self.state = DecoderState::ArrayMode;
self.line_buffer.push(line);
return Ok(None);
}
if Self::is_key_value_line(&line) {
let depth = line.depth;
self.state = DecoderState::SimpleObject { base_depth: 0 };
self.context_stack.push(0);
self.event_queue.push_back(JsonStreamEvent::StartObject);
self.process_key_value_line(&line)?;
self.last_depth = Some(depth);
return Ok(self.event_queue.pop_front());
}
self.state = DecoderState::Finished;
Ok(Some(JsonStreamEvent::Primitive {
value: parse_primitive_token(line.content.trim())?,
}))
}
fn process_simple_object_line(
&mut self,
line: ParsedLine,
base_depth: Depth,
) -> Result<Option<JsonStreamEvent>> {
let current_depth = line.depth;
if let Some(last_depth) = self.last_depth
&& current_depth < last_depth
{
while let Some(&obj_depth) = self.context_stack.last() {
if obj_depth >= current_depth && obj_depth > base_depth {
self.context_stack.pop();
self.event_queue.push_back(JsonStreamEvent::EndObject);
} else {
break;
}
}
}
if is_array_header_content(&line.content)
&& parse_array_header_line(&line.content, DEFAULT_DELIMITER)?.is_some()
{
self.state = DecoderState::ArrayMode;
self.line_buffer.push(line);
return Ok(self.event_queue.pop_front());
}
self.process_key_value_line(&line)?;
self.last_depth = Some(current_depth);
Ok(self.event_queue.pop_front())
}
fn process_key_value_line(&mut self, line: &ParsedLine) -> Result<()> {
let content = &line.content;
if content.starts_with(LIST_ITEM_PREFIX) {
self.state = DecoderState::ArrayMode;
self.line_buffer.push(line.clone());
return Ok(());
}
let (key, end, is_quoted) = parse_key_token(content, 0)?;
let rest = content[end..].trim();
if rest.is_empty() {
self.state = DecoderState::ArrayMode;
self.line_buffer.push(line.clone());
return Ok(());
}
self.event_queue.push_back(JsonStreamEvent::Key {
key,
was_quoted: is_quoted,
});
self.event_queue.push_back(JsonStreamEvent::Primitive {
value: parse_primitive_token(rest)?,
});
Ok(())
}
fn is_key_value_line(line: &ParsedLine) -> bool {
let content = line.content.as_str();
if content.starts_with(LIST_ITEM_PREFIX) {
return false;
}
if content.starts_with('"') {
if let Some(closing) = find_closing_quote(content, 0) {
return content[closing + 1..].contains(COLON);
}
return false;
}
content.contains(COLON) && is_key_value_content(content)
}
fn finalize(&mut self) -> Result<Option<JsonStreamEvent>> {
if !self.line_buffer.is_empty() || self.state == DecoderState::ArrayMode {
return self.batch_decode_remaining();
}
while self.context_stack.pop().is_some() {
self.event_queue.push_back(JsonStreamEvent::EndObject);
}
if self.state == DecoderState::Initial {
self.state = DecoderState::Finished;
self.event_queue.push_back(JsonStreamEvent::StartObject);
self.event_queue.push_back(JsonStreamEvent::EndObject);
} else {
self.state = DecoderState::Finished;
}
Ok(self.event_queue.pop_front())
}
fn batch_decode_remaining(&mut self) -> Result<Option<JsonStreamEvent>> {
let indent_size = self.indent_size();
let strict = self.strict();
for raw_line in self.lines.by_ref() {
if let Some(line) =
parse_line_incremental(&raw_line, &mut self.scan_state, indent_size, strict)?
{
self.line_buffer.push(line);
}
}
let raw_lines: Vec<String> = self.line_buffer.iter().map(|p| p.raw.clone()).collect();
let events = decoder_impl::decode_stream_sync(
raw_lines,
Some(DecodeStreamOptions {
indent: self.options.indent,
strict: self.options.strict,
}),
)?;
self.event_queue.extend(events);
self.line_buffer.clear();
self.context_stack.clear();
self.state = DecoderState::Finished;
Ok(self.event_queue.pop_front())
}
}
impl<I: Iterator<Item = String> + Unpin> Stream for AsyncDecodeStream<I> {
type Item = Result<JsonStreamEvent>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match self.process_next() {
Ok(Some(event)) => return Poll::Ready(Some(Ok(event))),
Ok(None) => {
if self.state == DecoderState::Finished && self.event_queue.is_empty() {
return Poll::Ready(None);
}
}
Err(e) => return Poll::Ready(Some(Err(e))),
}
}
}
}
pub async fn try_decode_stream_async(
lines: impl IntoIterator<Item = String>,
options: Option<DecodeStreamOptions>,
) -> Result<Vec<JsonStreamEvent>> {
let lines_vec: Vec<String> = lines.into_iter().collect();
let line_stream = iter(lines_vec.clone());
let _line_count = line_stream.count().await;
decoder_impl::decode_stream_sync(lines_vec, options)
}
pub async fn decode_stream_async(
lines: impl IntoIterator<Item = String>,
options: Option<DecodeStreamOptions>,
) -> Vec<JsonStreamEvent> {
try_decode_stream_async(lines, options)
.await
.unwrap_or_else(|err| panic!("{err}"))
}
pub async fn try_decode_async(
input: &str,
options: Option<crate::options::DecodeOptions>,
) -> Result<crate::JsonValue> {
use crate::decode::event_builder::{build_node_from_events, node_to_json};
use crate::decode::expand::expand_paths_safe;
use crate::options::{ExpandPathsMode, resolve_decode_options};
let resolved = resolve_decode_options(options);
let lines: Vec<String> = input.split('\n').map(String::from).collect();
let events = try_decode_stream_async(
lines,
Some(DecodeStreamOptions {
indent: Some(resolved.indent),
strict: Some(resolved.strict),
}),
)
.await?;
let mut node = build_node_from_events(events)?;
if resolved.expand_paths == ExpandPathsMode::Safe {
node = expand_paths_safe(node, resolved.strict)?;
}
Ok(node_to_json(node))
}
#[cfg(test)]
mod tests {
use super::*;
use crate::StringOrNumberOrBoolOrNull;
#[test]
fn test_async_decode_stream_creation() {
let lines = vec!["name: Alice".to_string(), "age: 30".to_string()];
let stream = AsyncDecodeStream::new(lines.into_iter(), None);
assert!(!stream.lines_exhausted);
assert_eq!(stream.state, DecoderState::Initial);
}
#[test]
fn test_incremental_simple_object() {
let lines = vec!["name: Alice".to_string(), "age: 30".to_string()];
let mut stream = AsyncDecodeStream::new(lines.into_iter(), None);
let mut events = Vec::new();
loop {
match stream.process_next() {
Ok(Some(event)) => events.push(event),
Ok(None) => {
if stream.state == DecoderState::Finished && stream.event_queue.is_empty() {
break;
}
}
Err(e) => panic!("Error: {e}"),
}
}
assert!(
events.len() >= 6,
"Expected at least 6 events, got {}",
events.len()
);
assert!(matches!(events[0], JsonStreamEvent::StartObject));
assert!(matches!(&events[1], JsonStreamEvent::Key { key, .. } if key == "name"));
assert!(
matches!(&events[2], JsonStreamEvent::Primitive { value: StringOrNumberOrBoolOrNull::String(s) } if s == "Alice")
);
}
#[test]
fn test_incremental_emits_events_as_lines_arrive() {
let lines = vec![
"first: 1".to_string(),
"second: 2".to_string(),
"third: 3".to_string(),
];
let mut stream = AsyncDecodeStream::new(lines.into_iter(), None);
let mut events_after_first_line = Vec::new();
for _ in 0..10 {
match stream.process_next() {
Ok(Some(event)) => events_after_first_line.push(event),
Ok(None) => {
if !stream.lines_exhausted {
break;
}
if stream.state == DecoderState::Finished {
break;
}
}
Err(e) => panic!("Error: {e}"),
}
}
assert!(
!events_after_first_line.is_empty(),
"Should have emitted events incrementally"
);
}
#[test]
fn test_empty_document() {
let lines: Vec<String> = vec![];
let mut stream = AsyncDecodeStream::new(lines.into_iter(), None);
let mut events = Vec::new();
loop {
match stream.process_next() {
Ok(Some(event)) => events.push(event),
Ok(None) => {
if stream.state == DecoderState::Finished && stream.event_queue.is_empty() {
break;
}
}
Err(e) => panic!("Error: {e}"),
}
}
assert_eq!(events.len(), 2);
assert!(matches!(events[0], JsonStreamEvent::StartObject));
assert!(matches!(events[1], JsonStreamEvent::EndObject));
}
}