1use crate::JsonStreamEvent;
2use crate::decode::parser::{
3 FieldName, is_array_header_content, is_key_value_content, map_row_values_to_primitives,
4 parse_array_header_line, parse_delimited_values, parse_key_token, parse_primitive_token,
5};
6use crate::decode::scanner::{
7 Depth, ParsedLine, StreamingLineCursor, create_scan_state, parse_lines_sync,
8};
9use crate::decode::validation::{
10 assert_expected_count, validate_no_blank_lines_in_range, validate_no_extra_list_items,
11 validate_no_extra_tabular_rows,
12};
13use crate::error::{Result, ToonError};
14use crate::options::DecodeStreamOptions;
15use crate::shared::constants::{COLON, DEFAULT_DELIMITER, LIST_ITEM_MARKER, LIST_ITEM_PREFIX};
16use crate::shared::string_utils::find_closing_quote;
17
18#[derive(Debug, Clone, Copy)]
19pub struct DecoderContext {
20 pub indent: usize,
21 pub strict: bool,
22}
23
24pub fn decode_stream_sync(
31 source: impl IntoIterator<Item = String>,
32 options: Option<DecodeStreamOptions>,
33) -> Result<Vec<JsonStreamEvent>> {
34 let options = options.unwrap_or(DecodeStreamOptions {
35 indent: None,
36 strict: None,
37 });
38 let context = DecoderContext {
39 indent: options.indent.unwrap_or(2),
40 strict: options.strict.unwrap_or(true),
41 };
42
43 let mut scan_state = create_scan_state();
44 let lines = parse_lines_sync(source, context.indent, context.strict, &mut scan_state)?;
45 let mut cursor = StreamingLineCursor::new(lines, scan_state.blank_lines);
46
47 let mut events = Vec::new();
48
49 let first = cursor.peek_sync().cloned();
50 let Some(first) = first else {
51 events.push(JsonStreamEvent::StartObject);
52 events.push(JsonStreamEvent::EndObject);
53 return Ok(events);
54 };
55
56 if is_array_header_content(&first.content)
57 && let Some(header_info) = parse_array_header_line(&first.content, DEFAULT_DELIMITER)?
58 {
59 cursor.advance_sync();
60 decode_array_from_header_sync(&mut events, header_info, &mut cursor, 0, context)?;
61 return Ok(events);
62 }
63
64 cursor.advance_sync();
65 let has_more = !cursor.at_end_sync();
66 if !has_more && !is_key_value_line_sync(&first) {
67 events.push(JsonStreamEvent::Primitive {
68 value: parse_primitive_token(first.content.trim())?,
69 });
70 return Ok(events);
71 }
72
73 events.push(JsonStreamEvent::StartObject);
74 decode_key_value_sync(&mut events, &first.content, &mut cursor, 0, context)?;
75
76 while !cursor.at_end_sync() {
77 let line = cursor.peek_sync().cloned();
78 let Some(line) = line else {
79 break;
80 };
81 if line.depth != 0 {
82 break;
83 }
84 cursor.advance_sync();
85 decode_key_value_sync(&mut events, &line.content, &mut cursor, 0, context)?;
86 }
87
88 events.push(JsonStreamEvent::EndObject);
89 Ok(events)
90}
91
92fn decode_key_value_sync(
93 events: &mut Vec<JsonStreamEvent>,
94 content: &str,
95 cursor: &mut StreamingLineCursor,
96 base_depth: Depth,
97 options: DecoderContext,
98) -> Result<()> {
99 if let Some(header_info) = parse_array_header_line(content, DEFAULT_DELIMITER)?
100 && let Some(key) = header_info.header.key.clone()
101 {
102 events.push(JsonStreamEvent::Key {
103 key,
104 was_quoted: header_info.header.key_was_quoted,
105 });
106 decode_array_from_header_sync(events, header_info, cursor, base_depth, options)?;
107 return Ok(());
108 }
109
110 let (key, end, is_quoted) = parse_key_token(content, 0)?;
111 let rest = content[end..].trim();
112
113 events.push(JsonStreamEvent::Key {
114 key,
115 was_quoted: is_quoted,
116 });
117
118 if rest.is_empty() {
119 let next_line = cursor.peek_sync();
120 if let Some(next) = next_line
121 && next.depth > base_depth
122 {
123 events.push(JsonStreamEvent::StartObject);
124 decode_object_fields_sync(events, cursor, base_depth + 1, options)?;
125 events.push(JsonStreamEvent::EndObject);
126 return Ok(());
127 }
128
129 events.push(JsonStreamEvent::StartObject);
130 events.push(JsonStreamEvent::EndObject);
131 return Ok(());
132 }
133
134 events.push(JsonStreamEvent::Primitive {
135 value: parse_primitive_token(rest)?,
136 });
137 Ok(())
138}
139
140fn decode_object_fields_sync(
141 events: &mut Vec<JsonStreamEvent>,
142 cursor: &mut StreamingLineCursor,
143 base_depth: Depth,
144 options: DecoderContext,
145) -> Result<()> {
146 let mut computed_depth: Option<Depth> = None;
147
148 while !cursor.at_end_sync() {
149 let line = cursor.peek_sync().cloned();
150 let Some(line) = line else {
151 break;
152 };
153 if line.depth < base_depth {
154 break;
155 }
156
157 if computed_depth.is_none() {
158 computed_depth = Some(line.depth);
159 }
160
161 if Some(line.depth) == computed_depth {
162 cursor.advance_sync();
163 decode_key_value_sync(events, &line.content, cursor, line.depth, options)?;
164 } else {
165 break;
166 }
167 }
168
169 Ok(())
170}
171
172fn decode_array_from_header_sync(
173 events: &mut Vec<JsonStreamEvent>,
174 header_info: crate::decode::parser::ArrayHeaderParseResult,
175 cursor: &mut StreamingLineCursor,
176 base_depth: Depth,
177 options: DecoderContext,
178) -> Result<()> {
179 let header = header_info.header;
180 let inline_values = header_info.inline_values;
181
182 events.push(JsonStreamEvent::StartArray {
183 length: header.length,
184 });
185
186 if let Some(inline_values) = inline_values {
187 decode_inline_primitive_array_sync(events, &header, &inline_values, options)?;
188 events.push(JsonStreamEvent::EndArray);
189 return Ok(());
190 }
191
192 if let Some(fields) = &header.fields
193 && !fields.is_empty()
194 {
195 decode_tabular_array_sync(events, &header, cursor, base_depth, options)?;
196 events.push(JsonStreamEvent::EndArray);
197 return Ok(());
198 }
199
200 decode_list_array_sync(events, &header, cursor, base_depth, options)?;
201 events.push(JsonStreamEvent::EndArray);
202 Ok(())
203}
204
205fn decode_inline_primitive_array_sync(
206 events: &mut Vec<JsonStreamEvent>,
207 header: &crate::decode::parser::ArrayHeaderInfo,
208 inline_values: &str,
209 options: DecoderContext,
210) -> Result<()> {
211 if inline_values.trim().is_empty() {
212 assert_expected_count(0, header.length, "inline array items", options.strict)?;
213 return Ok(());
214 }
215
216 let values = parse_delimited_values(inline_values, header.delimiter);
217 let primitives = map_row_values_to_primitives(&values)?;
218
219 assert_expected_count(
220 primitives.len(),
221 header.length,
222 "inline array items",
223 options.strict,
224 )?;
225
226 for primitive in primitives {
227 events.push(JsonStreamEvent::Primitive { value: primitive });
228 }
229
230 Ok(())
231}
232
233fn decode_tabular_array_sync(
234 events: &mut Vec<JsonStreamEvent>,
235 header: &crate::decode::parser::ArrayHeaderInfo,
236 cursor: &mut StreamingLineCursor,
237 base_depth: Depth,
238 options: DecoderContext,
239) -> Result<()> {
240 let row_depth = base_depth + 1;
241 let mut row_count = 0usize;
242 let mut start_line: Option<usize> = None;
243 let mut end_line: Option<usize> = None;
244
245 while !cursor.at_end_sync() && row_count < header.length {
246 let line = cursor.peek_sync().cloned();
247 let Some(line) = line else {
248 break;
249 };
250 if line.depth < row_depth {
251 break;
252 }
253
254 if line.depth == row_depth {
255 if start_line.is_none() {
256 start_line = Some(line.line_number);
257 }
258 end_line = Some(line.line_number);
259
260 cursor.advance_sync();
261 let values = parse_delimited_values(&line.content, header.delimiter);
262 let fields = header
263 .fields
264 .as_ref()
265 .ok_or_else(|| ToonError::message("Tabular array is missing header fields"))?;
266 assert_expected_count(
267 values.len(),
268 fields.len(),
269 "tabular row values",
270 options.strict,
271 )?;
272
273 let primitives = map_row_values_to_primitives(&values)?;
274 yield_object_from_fields(events, fields, &primitives);
275
276 row_count += 1;
277 } else {
278 break;
279 }
280 }
281
282 assert_expected_count(row_count, header.length, "tabular rows", options.strict)?;
283
284 if options.strict
285 && let (Some(start), Some(end)) = (start_line, end_line)
286 {
287 validate_no_blank_lines_in_range(
288 start,
289 end,
290 cursor.get_blank_lines(),
291 options.strict,
292 "tabular array",
293 )?;
294 }
295
296 validate_no_extra_tabular_rows(cursor.peek_sync(), row_depth, header, options.strict)?;
297 Ok(())
298}
299
300fn decode_list_array_sync(
301 events: &mut Vec<JsonStreamEvent>,
302 header: &crate::decode::parser::ArrayHeaderInfo,
303 cursor: &mut StreamingLineCursor,
304 base_depth: Depth,
305 options: DecoderContext,
306) -> Result<()> {
307 let item_depth = base_depth + 1;
308 let mut item_count = 0usize;
309 let mut start_line: Option<usize> = None;
310 let mut end_line: Option<usize> = None;
311
312 while !cursor.at_end_sync() && item_count < header.length {
313 let line = cursor.peek_sync().cloned();
314 let Some(line) = line else {
315 break;
316 };
317 if line.depth < item_depth {
318 break;
319 }
320
321 let is_list_item =
322 line.content.starts_with(LIST_ITEM_PREFIX) || line.content == LIST_ITEM_MARKER;
323 if line.depth == item_depth && is_list_item {
324 if start_line.is_none() {
325 start_line = Some(line.line_number);
326 }
327 end_line = Some(line.line_number);
328
329 decode_list_item_sync(events, cursor, item_depth, options)?;
330
331 if let Some(current) = cursor.current() {
332 end_line = Some(current.line_number);
333 }
334
335 item_count += 1;
336 } else {
337 break;
338 }
339 }
340
341 assert_expected_count(
342 item_count,
343 header.length,
344 "list array items",
345 options.strict,
346 )?;
347
348 if options.strict
349 && let (Some(start), Some(end)) = (start_line, end_line)
350 {
351 validate_no_blank_lines_in_range(
352 start,
353 end,
354 cursor.get_blank_lines(),
355 options.strict,
356 "list array",
357 )?;
358 }
359
360 validate_no_extra_list_items(
361 cursor.peek_sync(),
362 item_depth,
363 header.length,
364 options.strict,
365 )?;
366 Ok(())
367}
368
369fn decode_list_item_sync(
370 events: &mut Vec<JsonStreamEvent>,
371 cursor: &mut StreamingLineCursor,
372 base_depth: Depth,
373 options: DecoderContext,
374) -> Result<()> {
375 let line = cursor
376 .next_sync()
377 .ok_or_else(|| ToonError::message("Expected list item"))?;
378
379 if line.content == LIST_ITEM_MARKER {
380 events.push(JsonStreamEvent::StartObject);
381 events.push(JsonStreamEvent::EndObject);
382 return Ok(());
383 }
384
385 let after_hyphen = if line.content.starts_with(LIST_ITEM_PREFIX) {
386 line.content[LIST_ITEM_PREFIX.len()..].to_string()
387 } else {
388 return Err(ToonError::message(format!(
389 "Expected list item to start with \"{LIST_ITEM_PREFIX}\""
390 )));
391 };
392
393 if after_hyphen.trim().is_empty() {
394 events.push(JsonStreamEvent::StartObject);
395 events.push(JsonStreamEvent::EndObject);
396 return Ok(());
397 }
398
399 if is_array_header_content(&after_hyphen)
400 && let Some(header_info) = parse_array_header_line(&after_hyphen, DEFAULT_DELIMITER)?
401 {
402 decode_array_from_header_sync(events, header_info, cursor, base_depth, options)?;
403 return Ok(());
404 }
405
406 if let Some(header_info) = parse_array_header_line(&after_hyphen, DEFAULT_DELIMITER)?
407 && header_info.header.key.is_some()
408 && header_info.header.fields.is_some()
409 {
410 let header = header_info.header;
411 events.push(JsonStreamEvent::StartObject);
412 events.push(JsonStreamEvent::Key {
413 key: header.key.clone().unwrap_or_default(),
414 was_quoted: header.key_was_quoted,
415 });
416 decode_array_from_header_sync(
417 events,
418 crate::decode::parser::ArrayHeaderParseResult {
419 header,
420 inline_values: header_info.inline_values,
421 },
422 cursor,
423 base_depth + 1,
424 options,
425 )?;
426
427 let follow_depth = base_depth + 1;
428 while !cursor.at_end_sync() {
429 let next_line = cursor.peek_sync().cloned();
430 let Some(next_line) = next_line else {
431 break;
432 };
433 if next_line.depth < follow_depth {
434 break;
435 }
436 if next_line.depth == follow_depth && !next_line.content.starts_with(LIST_ITEM_PREFIX) {
437 cursor.advance_sync();
438 decode_key_value_sync(events, &next_line.content, cursor, follow_depth, options)?;
439 } else {
440 break;
441 }
442 }
443
444 events.push(JsonStreamEvent::EndObject);
445 return Ok(());
446 }
447
448 if is_key_value_content(&after_hyphen) {
449 events.push(JsonStreamEvent::StartObject);
450 decode_key_value_sync(events, &after_hyphen, cursor, base_depth + 1, options)?;
451
452 let follow_depth = base_depth + 1;
453 while !cursor.at_end_sync() {
454 let next_line = cursor.peek_sync().cloned();
455 let Some(next_line) = next_line else {
456 break;
457 };
458 if next_line.depth < follow_depth {
459 break;
460 }
461 if next_line.depth == follow_depth && !next_line.content.starts_with(LIST_ITEM_PREFIX) {
462 cursor.advance_sync();
463 decode_key_value_sync(events, &next_line.content, cursor, follow_depth, options)?;
464 } else {
465 break;
466 }
467 }
468
469 events.push(JsonStreamEvent::EndObject);
470 return Ok(());
471 }
472
473 events.push(JsonStreamEvent::Primitive {
474 value: parse_primitive_token(&after_hyphen)?,
475 });
476 Ok(())
477}
478
479fn yield_object_from_fields(
480 events: &mut Vec<JsonStreamEvent>,
481 fields: &[FieldName],
482 primitives: &[crate::JsonPrimitive],
483) {
484 events.push(JsonStreamEvent::StartObject);
485 for (idx, field) in fields.iter().enumerate() {
486 events.push(JsonStreamEvent::Key {
487 key: field.name.clone(),
488 was_quoted: field.was_quoted,
489 });
490 if let Some(value) = primitives.get(idx) {
491 events.push(JsonStreamEvent::Primitive {
492 value: value.clone(),
493 });
494 } else {
495 events.push(JsonStreamEvent::Primitive {
496 value: crate::StringOrNumberOrBoolOrNull::Null,
497 });
498 }
499 }
500 events.push(JsonStreamEvent::EndObject);
501}
502
503fn is_key_value_line_sync(line: &ParsedLine) -> bool {
504 let content = line.content.as_str();
505 if content.starts_with('"') {
506 if let Some(closing) = find_closing_quote(content, 0) {
507 return content[closing + 1..].contains(COLON);
508 }
509 return false;
510 }
511 content.contains(COLON)
512}