1use crate::async_reader::AsyncLineReader;
105use crate::error::{StreamError, StreamResult};
106use crate::event::{HeaderInfo, NodeEvent, NodeInfo};
107use crate::parser::{strip_comment, StreamingParserConfig};
108use hedl_core::lex::{calculate_indent, is_valid_key_token, is_valid_type_name};
109use hedl_core::Value;
110use std::future::Future;
111use std::pin::Pin;
112use std::task::{Context as TaskContext, Poll};
113use std::time::Instant;
114use tokio::io::AsyncRead;
115
116type ListContextResult = (String, Vec<String>, Option<(String, String)>);
118
119pub struct AsyncStreamingParser<R: AsyncRead + Unpin> {
180 reader: AsyncLineReader<R>,
181 config: StreamingParserConfig,
182 header: Option<HeaderInfo>,
183 state: ParserState,
184 finished: bool,
185 errored: bool, sent_end_of_document: bool, start_time: Instant,
188 operations_count: usize,
189}
190
191#[derive(Debug)]
192struct ParserState {
193 stack: Vec<Context>,
195 prev_row: Option<Vec<Value>>,
197}
198
199#[derive(Debug, Clone)]
200enum Context {
201 Root,
202 Object {
203 #[allow(dead_code)]
204 key: String,
205 indent: usize,
206 },
207 List {
208 key: String,
209 type_name: String,
210 schema: Vec<String>,
211 row_indent: usize,
212 count: usize,
213 last_node: Option<(String, String)>, },
215}
216
217impl<R: AsyncRead + Unpin> AsyncStreamingParser<R> {
218 pub async fn new(reader: R) -> StreamResult<Self> {
276 Self::with_config(reader, StreamingParserConfig::default()).await
277 }
278
279 pub async fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
308 let mut parser = Self {
309 reader: AsyncLineReader::with_capacity(reader, config.buffer_size),
310 config,
311 header: None,
312 state: ParserState {
313 stack: vec![Context::Root],
314 prev_row: None,
315 },
316 finished: false,
317 errored: false,
318 sent_end_of_document: false,
319 start_time: Instant::now(),
320 operations_count: 0,
321 };
322
323 parser.parse_header().await?;
325
326 Ok(parser)
327 }
328
329 #[inline]
331 fn check_timeout(&self) -> StreamResult<()> {
332 if let Some(timeout) = self.config.timeout {
333 let elapsed = self.start_time.elapsed();
334 if elapsed > timeout {
335 return Err(StreamError::Timeout {
336 elapsed,
337 limit: timeout,
338 });
339 }
340 }
341 Ok(())
342 }
343
344 #[inline]
349 fn return_error<T>(&mut self, e: StreamError) -> StreamResult<T> {
350 self.finished = true;
351 self.errored = true;
352 Err(e)
353 }
354
355 pub fn header(&self) -> Option<&HeaderInfo> {
384 self.header.as_ref()
385 }
386
387 pub async fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
426 if self.errored {
428 return Ok(None);
429 }
430 if self.finished {
432 return self.finalize();
433 }
434
435 loop {
436 self.operations_count += 1;
438 if self.operations_count % 100 == 0 {
439 if let Err(e) = self.check_timeout() {
440 return self.return_error(e);
441 }
442 }
443
444 let (line_num, line) = match self.reader.next_line().await {
445 Ok(Some(l)) => l,
446 Ok(None) => {
447 self.finished = true;
448 return self.finalize();
449 }
450 Err(e) => return self.return_error(e),
451 };
452
453 let trimmed = line.trim();
454
455 if trimmed.is_empty() || trimmed.starts_with('#') {
457 continue;
458 }
459
460 let indent_info = match calculate_indent(&line, line_num as u32) {
462 Ok(info) => info,
463 Err(e) => return self.return_error(StreamError::syntax(line_num, e.to_string())),
464 };
465
466 let (indent, content) = match indent_info {
467 Some(info) => (info.level, &line[info.spaces..]),
468 None => continue,
469 };
470
471 if indent > self.config.max_indent_depth {
472 return self.return_error(StreamError::syntax(
473 line_num,
474 format!("indent depth {indent} exceeds limit"),
475 ));
476 }
477
478 let events = match self.pop_contexts(indent) {
480 Ok(e) => e,
481 Err(e) => return self.return_error(e),
482 };
483 if let Some(event) = events {
484 self.reader.push_back(line_num, line);
486 return Ok(Some(event));
487 }
488
489 return match self.parse_line(content, indent, line_num) {
491 Ok(result) => Ok(result),
492 Err(e) => self.return_error(e),
493 };
494 }
495 }
496
497 async fn parse_header(&mut self) -> StreamResult<()> {
498 let mut header = HeaderInfo::new();
499 let mut found_version = false;
500 let mut _found_separator = false;
501
502 while let Some((line_num, line)) = self.reader.next_line().await? {
503 self.check_timeout()?;
504
505 let trimmed = line.trim();
506
507 if trimmed.is_empty() || trimmed.starts_with('#') {
508 continue;
509 }
510
511 if trimmed == "---" {
512 _found_separator = true;
513 break;
514 }
515
516 if trimmed.starts_with('%') {
517 self.parse_directive(trimmed, line_num, &mut header, &mut found_version)?;
518 } else {
519 self.reader.push_back(line_num, line);
520 break;
521 }
522 }
523
524 if !found_version {
525 return Err(StreamError::MissingVersion);
526 }
527
528 self.header = Some(header);
529 Ok(())
530 }
531
532 fn parse_directive(
533 &self,
534 line: &str,
535 line_num: usize,
536 header: &mut HeaderInfo,
537 found_version: &mut bool,
538 ) -> StreamResult<()> {
539 if line.starts_with("%VERSION") {
540 self.parse_version_directive(line, header, found_version)
541 } else if line.starts_with("%STRUCT") {
542 self.parse_struct_directive(line, line_num, header)
543 } else if line.starts_with("%ALIAS") {
544 self.parse_alias_directive(line, line_num, header)
545 } else if line.starts_with("%NEST") {
546 self.parse_nest_directive(line, line_num, header)
547 } else {
548 Ok(())
549 }
550 }
551
552 fn strip_inline_comment(text: &str) -> &str {
557 let mut in_quotes = false;
558 let mut in_brackets = 0;
559 let mut quote_char = '"';
560
561 for (i, c) in text.char_indices() {
562 match c {
563 '"' | '\'' if !in_quotes => {
564 in_quotes = true;
565 quote_char = c;
566 }
567 c if in_quotes && c == quote_char => {
568 in_quotes = false;
569 }
570 '[' if !in_quotes => in_brackets += 1,
571 ']' if !in_quotes && in_brackets > 0 => in_brackets -= 1,
572 '#' if !in_quotes && in_brackets == 0 => {
573 return text[..i].trim_end();
574 }
575 _ => {}
576 }
577 }
578 text
579 }
580
581 fn parse_version_directive(
582 &self,
583 line: &str,
584 header: &mut HeaderInfo,
585 found_version: &mut bool,
586 ) -> StreamResult<()> {
587 let line = Self::strip_inline_comment(line);
589 let rest = line.strip_prefix("%VERSION").expect("prefix exists").trim();
590 let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
591 let parts: Vec<&str> = rest.split('.').collect();
592
593 if parts.len() != 2 {
594 return Err(StreamError::InvalidVersion(rest.to_string()));
595 }
596
597 let major: u32 = parts[0]
598 .parse()
599 .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
600 let minor: u32 = parts[1]
601 .parse()
602 .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
603
604 header.version = (major, minor);
605 *found_version = true;
606 Ok(())
607 }
608
609 fn parse_struct_directive(
610 &self,
611 line: &str,
612 line_num: usize,
613 header: &mut HeaderInfo,
614 ) -> StreamResult<()> {
615 let line = Self::strip_inline_comment(line);
617 let rest = line.strip_prefix("%STRUCT").expect("prefix exists").trim();
618 let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
619
620 let bracket_start = rest
621 .find('[')
622 .ok_or_else(|| StreamError::syntax(line_num, "missing '[' in %STRUCT"))?;
623 let bracket_end = rest
624 .find(']')
625 .ok_or_else(|| StreamError::syntax(line_num, "missing ']' in %STRUCT"))?;
626
627 let type_part = rest[..bracket_start].trim().trim_end_matches(':').trim();
628 let type_name = if let Some(paren_pos) = type_part.find('(') {
629 type_part[..paren_pos].trim()
630 } else {
631 type_part
632 };
633 if !is_valid_type_name(type_name) {
634 return Err(StreamError::syntax(
635 line_num,
636 format!("invalid type name: {type_name}"),
637 ));
638 }
639
640 let cols_str = &rest[bracket_start + 1..bracket_end];
641 let columns: Vec<String> = cols_str
642 .split(',')
643 .map(|s| s.trim().to_string())
644 .filter(|s| !s.is_empty())
645 .collect();
646
647 if columns.is_empty() {
648 return Err(StreamError::syntax(line_num, "empty schema"));
649 }
650
651 header.structs.insert(type_name.to_string(), columns);
652 Ok(())
653 }
654
655 fn parse_alias_directive(
656 &self,
657 line: &str,
658 line_num: usize,
659 header: &mut HeaderInfo,
660 ) -> StreamResult<()> {
661 let line = Self::strip_inline_comment(line);
663 let rest = line.strip_prefix("%ALIAS").expect("prefix exists").trim();
664 let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
665
666 let sep_pos = rest
667 .find('=')
668 .or_else(|| rest.find(':'))
669 .ok_or_else(|| StreamError::syntax(line_num, "missing '=' or ':' in %ALIAS"))?;
670
671 let alias = rest[..sep_pos].trim();
672 let value = rest[sep_pos + 1..].trim().trim_matches('"');
673
674 header.aliases.insert(alias.to_string(), value.to_string());
675 Ok(())
676 }
677
678 fn parse_nest_directive(
679 &self,
680 line: &str,
681 line_num: usize,
682 header: &mut HeaderInfo,
683 ) -> StreamResult<()> {
684 let line = Self::strip_inline_comment(line);
686 let rest = line.strip_prefix("%NEST").expect("prefix exists").trim();
687 let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
688
689 let arrow_pos = rest
690 .find('>')
691 .ok_or_else(|| StreamError::syntax(line_num, "missing '>' in %NEST"))?;
692
693 let parent = rest[..arrow_pos].trim();
694 let child = rest[arrow_pos + 1..].trim();
695
696 if !is_valid_type_name(parent) || !is_valid_type_name(child) {
697 return Err(StreamError::syntax(line_num, "invalid type name in %NEST"));
698 }
699
700 header.nests.insert(parent.to_string(), child.to_string());
701 Ok(())
702 }
703
704 fn pop_contexts(&mut self, current_indent: usize) -> StreamResult<Option<NodeEvent>> {
705 while self.state.stack.len() > 1 {
706 let should_pop = match self.state.stack.last().expect("stack has elements") {
707 Context::Root => false,
708 Context::Object { indent, .. } => current_indent <= *indent,
709 Context::List { row_indent, .. } => current_indent < *row_indent,
710 };
711
712 if should_pop {
713 let ctx = self.state.stack.pop().expect("stack has elements");
714 match ctx {
715 Context::List {
716 key,
717 type_name,
718 count,
719 ..
720 } => {
721 return Ok(Some(NodeEvent::ListEnd {
722 key,
723 type_name,
724 count,
725 }));
726 }
727 Context::Object { key, .. } => {
728 return Ok(Some(NodeEvent::ObjectEnd { key }));
729 }
730 Context::Root => {
731 }
733 }
734 } else {
735 break;
736 }
737 }
738
739 Ok(None)
740 }
741
742 fn parse_line(
743 &mut self,
744 content: &str,
745 indent: usize,
746 line_num: usize,
747 ) -> StreamResult<Option<NodeEvent>> {
748 let content = strip_comment(content);
749
750 if let Some(row_content) = content.strip_prefix('|') {
751 self.parse_matrix_row(row_content, indent, line_num)
752 } else if let Some(colon_pos) = content.find(':') {
753 let key = content[..colon_pos].trim();
754 let after_colon = &content[colon_pos + 1..];
755
756 if !is_valid_key_token(key) {
757 return Err(StreamError::syntax(line_num, format!("invalid key: {key}")));
758 }
759
760 let after_colon_trimmed = after_colon.trim();
761
762 if after_colon_trimmed.is_empty() {
763 self.state.stack.push(Context::Object {
764 key: key.to_string(),
765 indent,
766 });
767 Ok(Some(NodeEvent::ObjectStart {
768 key: key.to_string(),
769 line: line_num,
770 }))
771 } else if after_colon_trimmed.starts_with('@')
772 && self.is_list_start(after_colon_trimmed)
773 {
774 let (type_name, schema) = self.parse_list_start(after_colon_trimmed, line_num)?;
775
776 self.state.stack.push(Context::List {
777 key: key.to_string(),
778 type_name: type_name.clone(),
779 schema: schema.clone(),
780 row_indent: indent + 1,
781 count: 0,
782 last_node: None,
783 });
784
785 self.state.prev_row = None;
786
787 Ok(Some(NodeEvent::ListStart {
788 key: key.to_string(),
789 type_name,
790 schema,
791 line: line_num,
792 }))
793 } else {
794 let value = self.infer_value(after_colon.trim(), line_num)?;
795 Ok(Some(NodeEvent::Scalar {
796 key: key.to_string(),
797 value,
798 line: line_num,
799 }))
800 }
801 } else {
802 Err(StreamError::syntax(line_num, "expected ':' in line"))
803 }
804 }
805
806 #[inline]
807 fn is_list_start(&self, s: &str) -> bool {
808 let s = s.trim();
809 if !s.starts_with('@') {
810 return false;
811 }
812 let rest = &s[1..];
813 let type_end = rest
814 .find(|c: char| c == '[' || c.is_whitespace())
815 .unwrap_or(rest.len());
816 let type_name = &rest[..type_end];
817 is_valid_type_name(type_name)
818 }
819
820 fn parse_list_start(&self, s: &str, line_num: usize) -> StreamResult<(String, Vec<String>)> {
821 let s = s.trim();
822 let rest = &s[1..];
823
824 if let Some(bracket_pos) = rest.find('[') {
825 let type_name = &rest[..bracket_pos];
826 if !is_valid_type_name(type_name) {
827 return Err(StreamError::syntax(
828 line_num,
829 format!("invalid type name: {type_name}"),
830 ));
831 }
832
833 let bracket_end = rest
834 .find(']')
835 .ok_or_else(|| StreamError::syntax(line_num, "missing ']'"))?;
836
837 let cols_str = &rest[bracket_pos + 1..bracket_end];
838 let columns: Vec<String> = cols_str
839 .split(',')
840 .map(|s| s.trim().to_string())
841 .filter(|s| !s.is_empty())
842 .collect();
843
844 Ok((type_name.to_string(), columns))
845 } else {
846 let type_name = rest.trim();
847 if !is_valid_type_name(type_name) {
848 return Err(StreamError::syntax(
849 line_num,
850 format!("invalid type name: {type_name}"),
851 ));
852 }
853
854 let header = self
855 .header
856 .as_ref()
857 .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
858
859 let schema = header.structs.get(type_name).ok_or_else(|| {
860 StreamError::schema(line_num, format!("undefined type: {type_name}"))
861 })?;
862
863 Ok((type_name.to_string(), schema.clone()))
864 }
865 }
866
867 fn parse_matrix_row(
868 &mut self,
869 content: &str,
870 indent: usize,
871 line_num: usize,
872 ) -> StreamResult<Option<NodeEvent>> {
873 let content = strip_comment(content).trim();
874
875 let (type_name, schema, parent_info) = self.find_list_context(indent, line_num)?;
876
877 let fields = hedl_core::lex::parse_csv_row(content)
878 .map_err(|e| StreamError::syntax(line_num, format!("row parse error: {e}")))?;
879
880 if fields.len() != schema.len() {
881 return Err(StreamError::ShapeMismatch {
882 line: line_num,
883 expected: schema.len(),
884 got: fields.len(),
885 });
886 }
887
888 let mut values = Vec::with_capacity(fields.len());
889 for (col_idx, field) in fields.iter().enumerate() {
890 let value = if field.value == "^" {
891 self.state
892 .prev_row
893 .as_ref()
894 .and_then(|prev| prev.get(col_idx).cloned())
895 .unwrap_or(Value::Null)
896 } else if field.is_quoted {
897 Value::String(field.value.clone().into())
898 } else {
899 self.infer_value(&field.value, line_num)?
900 };
901 values.push(value);
902 }
903
904 let id = match &values[0] {
905 Value::String(s) => s.clone(),
906 _ => return Err(StreamError::syntax(line_num, "ID column must be a string")),
907 };
908
909 self.update_list_context(&type_name, &id);
910 self.state.prev_row = Some(values.clone());
911
912 let depth = self
914 .state
915 .stack
916 .iter()
917 .filter(|ctx| matches!(ctx, Context::List { .. }))
918 .count()
919 .saturating_sub(1);
920
921 let mut node = NodeInfo::new(type_name.clone(), id.to_string(), values, depth, line_num);
922
923 if let Some((parent_type, parent_id)) = parent_info {
924 node = node.with_parent(parent_type, parent_id);
925 }
926
927 Ok(Some(NodeEvent::Node(node)))
928 }
929
930 fn find_list_context(
931 &mut self,
932 indent: usize,
933 line_num: usize,
934 ) -> StreamResult<ListContextResult> {
935 let header = self
936 .header
937 .as_ref()
938 .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
939
940 for ctx in self.state.stack.iter().rev() {
941 if let Context::List {
942 type_name,
943 schema,
944 row_indent,
945 last_node,
946 ..
947 } = ctx
948 {
949 if indent == *row_indent {
950 return Ok((type_name.clone(), schema.clone(), None));
951 } else if indent == *row_indent + 1 {
952 let parent_info = last_node.clone().ok_or_else(|| {
953 StreamError::orphan_row(line_num, "child row has no parent")
954 })?;
955
956 let child_type = header.nests.get(type_name).ok_or_else(|| {
957 StreamError::orphan_row(
958 line_num,
959 format!("no NEST rule for parent type '{type_name}'"),
960 )
961 })?;
962
963 let child_schema = header.structs.get(child_type).ok_or_else(|| {
964 StreamError::schema(
965 line_num,
966 format!("child type '{child_type}' not defined"),
967 )
968 })?;
969
970 self.state.stack.push(Context::List {
971 key: child_type.clone(),
972 type_name: child_type.clone(),
973 schema: child_schema.clone(),
974 row_indent: indent,
975 count: 0,
976 last_node: None,
977 });
978
979 return Ok((child_type.clone(), child_schema.clone(), Some(parent_info)));
980 }
981 }
982 }
983
984 Err(StreamError::syntax(
985 line_num,
986 "matrix row outside of list context",
987 ))
988 }
989
990 fn update_list_context(&mut self, type_name: &str, id: &str) {
991 for ctx in self.state.stack.iter_mut().rev() {
992 if let Context::List {
993 type_name: ctx_type,
994 last_node,
995 count,
996 ..
997 } = ctx
998 {
999 if ctx_type == type_name {
1000 *last_node = Some((type_name.to_string(), id.to_string()));
1001 *count += 1;
1002 break;
1003 }
1004 }
1005 }
1006 }
1007
1008 #[inline]
1009 fn infer_value(&self, s: &str, _line_num: usize) -> StreamResult<Value> {
1010 let s = s.trim();
1011
1012 if s.is_empty() || s == "~" {
1013 return Ok(Value::Null);
1014 }
1015
1016 if s == "true" {
1017 return Ok(Value::Bool(true));
1018 }
1019 if s == "false" {
1020 return Ok(Value::Bool(false));
1021 }
1022
1023 if let Some(ref_part) = s.strip_prefix('@') {
1024 if let Some(colon_pos) = ref_part.find(':') {
1025 let type_name = &ref_part[..colon_pos];
1026 let id = &ref_part[colon_pos + 1..];
1027 return Ok(Value::Reference(hedl_core::Reference {
1028 type_name: Some(type_name.to_string().into()),
1029 id: id.to_string().into(),
1030 }));
1031 }
1032 return Ok(Value::Reference(hedl_core::Reference {
1033 type_name: None,
1034 id: ref_part.to_string().into(),
1035 }));
1036 }
1037
1038 if let Some(alias) = s.strip_prefix('$') {
1039 if let Some(header) = &self.header {
1040 if let Some(value) = header.aliases.get(alias) {
1041 return Ok(Value::String(value.clone().into()));
1042 }
1043 }
1044 return Ok(Value::String(s.to_string().into()));
1045 }
1046
1047 if let Ok(i) = s.parse::<i64>() {
1048 return Ok(Value::Int(i));
1049 }
1050 if let Ok(f) = s.parse::<f64>() {
1051 return Ok(Value::Float(f));
1052 }
1053
1054 Ok(Value::String(s.to_string().into()))
1055 }
1056
1057 fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
1058 if self.sent_end_of_document {
1060 return Ok(None);
1061 }
1062
1063 while self.state.stack.len() > 1 {
1064 let ctx = self.state.stack.pop().expect("stack has elements");
1065 match ctx {
1066 Context::List {
1067 key,
1068 type_name,
1069 count,
1070 ..
1071 } => {
1072 return Ok(Some(NodeEvent::ListEnd {
1073 key,
1074 type_name,
1075 count,
1076 }));
1077 }
1078 Context::Object { key, .. } => {
1079 return Ok(Some(NodeEvent::ObjectEnd { key }));
1080 }
1081 Context::Root => {
1082 }
1084 }
1085 }
1086
1087 self.sent_end_of_document = true;
1089 Ok(Some(NodeEvent::EndOfDocument))
1090 }
1091
1092 pub async fn next_batch(&mut self, n: usize) -> StreamResult<Vec<NodeEvent>> {
1133 let mut batch = Vec::with_capacity(n.min(100)); for _ in 0..n {
1135 match self.next_event().await? {
1136 Some(NodeEvent::EndOfDocument) => break,
1137 Some(event) => batch.push(event),
1138 None => break,
1139 }
1140 }
1141 Ok(batch)
1142 }
1143
1144 #[cfg(feature = "async")]
1182 pub async fn next_event_cancellable(
1183 &mut self,
1184 cancel_rx: &mut tokio::sync::watch::Receiver<bool>,
1185 ) -> StreamResult<Option<NodeEvent>> {
1186 if *cancel_rx.borrow() {
1188 return Ok(None);
1189 }
1190
1191 tokio::select! {
1192 result = self.next_event() => result,
1193 _ = cancel_rx.changed() => {
1194 if *cancel_rx.borrow() {
1195 Ok(None)
1196 } else {
1197 self.next_event().await
1199 }
1200 }
1201 }
1202 }
1203}
1204
1205#[cfg(feature = "async")]
1207impl<R: AsyncRead + Unpin> futures_core::Stream for AsyncStreamingParser<R> {
1208 type Item = StreamResult<NodeEvent>;
1209
1210 fn poll_next(mut self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Option<Self::Item>> {
1211 let fut = self.next_event();
1213 tokio::pin!(fut);
1214
1215 match fut.poll(cx) {
1216 Poll::Ready(Ok(Some(NodeEvent::EndOfDocument))) => Poll::Ready(None),
1217 Poll::Ready(Ok(Some(event))) => Poll::Ready(Some(Ok(event))),
1218 Poll::Ready(Ok(None)) => Poll::Ready(None),
1219 Poll::Ready(Err(e)) => Poll::Ready(Some(Err(e))),
1221 Poll::Pending => Poll::Pending,
1222 }
1223 }
1224}
1225
1226#[cfg(all(test, feature = "async"))]
1227mod tests {
1228 use super::*;
1229 use std::io::Cursor;
1230 use std::time::Duration;
1231
1232 #[tokio::test]
1233 async fn test_parse_header() {
1234 let input = r#"
1235%VERSION: 1.0
1236%STRUCT: User: [id, name, email]
1237%ALIAS active = "Active"
1238%NEST: User > Order
1239---
1240"#;
1241 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1242 let header = parser.header().unwrap();
1243
1244 assert_eq!(header.version, (1, 0));
1245 assert!(header.structs.contains_key("User"));
1246 assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
1247 assert_eq!(header.nests.get("User"), Some(&"Order".to_string()));
1248 }
1249
1250 #[tokio::test]
1251 async fn test_streaming_nodes() {
1252 let input = r"
1253%VERSION: 1.0
1254%STRUCT: User: [id, name]
1255---
1256users: @User
1257 | alice, Alice Smith
1258 | bob, Bob Jones
1259";
1260 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1261
1262 let mut events = Vec::new();
1263 while let Some(event) = parser.next_event().await.unwrap() {
1264 events.push(event);
1265 }
1266
1267 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
1268 assert_eq!(nodes.len(), 2);
1269 assert_eq!(nodes[0].id, "alice");
1270 assert_eq!(nodes[1].id, "bob");
1271 }
1272
1273 #[tokio::test]
1274 async fn test_timeout() {
1275 let config = StreamingParserConfig {
1277 timeout: Some(Duration::from_millis(1)),
1278 ..Default::default()
1279 };
1280
1281 let input = r"
1282%VERSION: 1.0
1283---
1284";
1285 let parser = AsyncStreamingParser::with_config(Cursor::new(input), config).await;
1286 assert!(parser.is_ok()); }
1288
1289 #[tokio::test]
1290 async fn test_inline_schema() {
1291 let input = r"
1292%VERSION: 1.0
1293---
1294items: @Item[id, name]
1295 | item1, First
1296 | item2, Second
1297";
1298 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1299
1300 let mut nodes = Vec::new();
1301 while let Some(event) = parser.next_event().await.unwrap() {
1302 if let NodeEvent::Node(node) = event {
1303 nodes.push(node);
1304 }
1305 }
1306
1307 assert_eq!(nodes.len(), 2);
1308 assert_eq!(nodes[0].type_name, "Item");
1309 }
1310
1311 #[tokio::test]
1312 async fn test_error_handling() {
1313 let input = r"
1314%VERSION: 1.0
1315---
1316invalid line without colon
1317";
1318 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1319
1320 let result = parser.next_event().await;
1321 assert!(result.is_err());
1322 assert!(matches!(result.unwrap_err(), StreamError::Syntax { .. }));
1323 }
1324
1325 #[tokio::test]
1326 async fn test_unicode() {
1327 let input = r"
1328%VERSION: 1.0
1329%STRUCT: User: [id, name]
1330---
1331users: @User
1332 | 用户1, 张三
1333 | пользователь, Иван
1334";
1335 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1336
1337 let mut nodes = Vec::new();
1338 while let Some(event) = parser.next_event().await.unwrap() {
1339 if let NodeEvent::Node(node) = event {
1340 nodes.push(node);
1341 }
1342 }
1343
1344 assert_eq!(nodes.len(), 2);
1345 assert_eq!(nodes[0].id, "用户1");
1346 assert_eq!(nodes[1].id, "пользователь");
1347 }
1348
1349 #[tokio::test]
1352 async fn test_stream_trait_basic() {
1353 use futures::StreamExt;
1354
1355 let input = r"
1356%VERSION: 1.0
1357%STRUCT: User: [id, name]
1358---
1359users: @User
1360 | alice, Alice
1361 | bob, Bob
1362";
1363 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1364
1365 let events: Vec<_> = parser.collect().await;
1366 assert!(events.iter().all(std::result::Result::is_ok));
1367
1368 let nodes: Vec<_> = events
1369 .iter()
1370 .filter_map(|e| e.as_ref().ok())
1371 .filter_map(|e| e.as_node())
1372 .collect();
1373
1374 assert_eq!(nodes.len(), 2);
1375 assert_eq!(nodes[0].id, "alice");
1376 assert_eq!(nodes[1].id, "bob");
1377 }
1378
1379 #[tokio::test]
1380 async fn test_stream_trait_filter_map() {
1381 use futures::StreamExt;
1382
1383 let input = r"
1384%VERSION: 1.0
1385%STRUCT: User: [id, name, active]
1386---
1387users: @User
1388 | alice, Alice, true
1389 | bob, Bob, false
1390 | carol, Carol, true
1391";
1392 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1393
1394 let active_nodes: Vec<_> = parser
1396 .filter_map(|result| async move {
1397 result.ok().and_then(|event| {
1398 if let NodeEvent::Node(node) = event {
1399 Some(node)
1400 } else {
1401 None
1402 }
1403 })
1404 })
1405 .filter(|node| {
1406 let is_active = matches!(node.get_field(2), Some(Value::Bool(true)));
1407 async move { is_active }
1408 })
1409 .collect()
1410 .await;
1411
1412 assert_eq!(active_nodes.len(), 2);
1413 assert_eq!(active_nodes[0].id, "alice");
1414 assert_eq!(active_nodes[1].id, "carol");
1415 }
1416
1417 #[tokio::test]
1418 async fn test_stream_trait_take() {
1419 use futures::StreamExt;
1420
1421 let input = r"
1422%VERSION: 1.0
1423%STRUCT: User: [id, name]
1424---
1425users: @User
1426 | alice, Alice
1427 | bob, Bob
1428 | carol, Carol
1429 | dave, Dave
1430";
1431 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1432
1433 let nodes: Vec<_> = parser
1435 .filter_map(|result| async move {
1436 result.ok().and_then(|event| {
1437 if let NodeEvent::Node(node) = event {
1438 Some(node)
1439 } else {
1440 None
1441 }
1442 })
1443 })
1444 .take(2)
1445 .collect()
1446 .await;
1447
1448 assert_eq!(nodes.len(), 2);
1449 assert_eq!(nodes[0].id, "alice");
1450 assert_eq!(nodes[1].id, "bob");
1451 }
1452
1453 #[tokio::test]
1454 async fn test_stream_trait_count() {
1455 use futures::StreamExt;
1456
1457 let input = r"
1458%VERSION: 1.0
1459%STRUCT: User: [id, name]
1460---
1461users: @User
1462 | alice, Alice
1463 | bob, Bob
1464 | carol, Carol
1465";
1466 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1467
1468 let total = parser.count().await;
1469 assert_eq!(total, 5);
1471 }
1472
1473 #[tokio::test]
1476 async fn test_next_batch_basic() {
1477 let input = r"
1478%VERSION: 1.0
1479%STRUCT: User: [id, name]
1480---
1481users: @User
1482 | alice, Alice
1483 | bob, Bob
1484 | carol, Carol
1485";
1486 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1487
1488 let batch = parser.next_batch(10).await.unwrap();
1490 assert_eq!(batch.len(), 5); let batch = parser.next_batch(10).await.unwrap();
1494 assert!(batch.is_empty());
1495 }
1496
1497 #[tokio::test]
1498 async fn test_next_batch_incremental() {
1499 let input = r"
1500%VERSION: 1.0
1501%STRUCT: User: [id, name]
1502---
1503users: @User
1504 | alice, Alice
1505 | bob, Bob
1506 | carol, Carol
1507";
1508 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1509
1510 let batch1 = parser.next_batch(2).await.unwrap();
1512 assert_eq!(batch1.len(), 2); let batch2 = parser.next_batch(2).await.unwrap();
1515 assert_eq!(batch2.len(), 2); let batch3 = parser.next_batch(2).await.unwrap();
1518 assert_eq!(batch3.len(), 1); let batch4 = parser.next_batch(2).await.unwrap();
1521 assert!(batch4.is_empty());
1522 }
1523
1524 #[tokio::test]
1525 async fn test_next_batch_empty_file() {
1526 let input = r"
1527%VERSION: 1.0
1528%STRUCT: User: [id, name]
1529---
1530";
1531 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1532
1533 let batch = parser.next_batch(10).await.unwrap();
1534 assert!(batch.is_empty());
1535 }
1536
1537 #[tokio::test]
1538 async fn test_next_batch_large() {
1539 let mut input = String::from(
1540 r"
1541%VERSION: 1.0
1542%STRUCT: Data: [id, value]
1543---
1544data: @Data
1545",
1546 );
1547 for i in 0..500 {
1548 input.push_str(&format!(" | row{i}, value{i}\n"));
1549 }
1550
1551 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1552
1553 let batch1 = parser.next_batch(100).await.unwrap();
1555 assert_eq!(batch1.len(), 100); let batch2 = parser.next_batch(100).await.unwrap();
1558 assert_eq!(batch2.len(), 100); let mut total = batch1.len() + batch2.len();
1562 loop {
1563 let batch = parser.next_batch(100).await.unwrap();
1564 if batch.is_empty() {
1565 break;
1566 }
1567 total += batch.len();
1568 }
1569
1570 assert_eq!(total, 502);
1572 }
1573
1574 #[tokio::test]
1577 async fn test_cancellation_basic() {
1578 use tokio::sync::watch;
1579
1580 let input = r"
1581%VERSION: 1.0
1582%STRUCT: User: [id, name]
1583---
1584users: @User
1585 | alice, Alice
1586 | bob, Bob
1587";
1588 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1589
1590 let (cancel_tx, mut cancel_rx) = watch::channel(false);
1591
1592 let event1 = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1594 assert!(event1.is_some());
1595
1596 cancel_tx.send(true).unwrap();
1598
1599 let event2 = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1601 assert!(event2.is_none());
1602 }
1603
1604 #[tokio::test]
1605 async fn test_cancellation_not_cancelled() {
1606 use tokio::sync::watch;
1607
1608 let input = r"
1609%VERSION: 1.0
1610%STRUCT: User: [id, name]
1611---
1612users: @User
1613 | alice, Alice
1614";
1615 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1616
1617 let (_cancel_tx, mut cancel_rx) = watch::channel(false);
1618
1619 let mut count = 0;
1621 while let Some(_event) = parser.next_event_cancellable(&mut cancel_rx).await.unwrap() {
1622 count += 1;
1623 }
1624
1625 assert_eq!(count, 4);
1627 }
1628
1629 #[tokio::test]
1630 async fn test_cancellation_during_processing() {
1631 use tokio::sync::watch;
1632
1633 let mut input = String::from(
1634 r"
1635%VERSION: 1.0
1636%STRUCT: Data: [id]
1637---
1638data: @Data
1639",
1640 );
1641 for i in 0..1000 {
1642 input.push_str(&format!(" | row{i}\n"));
1643 }
1644
1645 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1646
1647 let (cancel_tx, mut cancel_rx) = watch::channel(false);
1648
1649 tokio::spawn(async move {
1651 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
1652 cancel_tx.send(true).unwrap();
1653 });
1654
1655 let mut count = 0;
1656 while let Some(_event) = parser.next_event_cancellable(&mut cancel_rx).await.unwrap() {
1657 count += 1;
1658 tokio::time::sleep(tokio::time::Duration::from_micros(10)).await;
1660 }
1661
1662 assert!(count < 1002);
1664 assert!(count > 0);
1665 }
1666
1667 #[tokio::test]
1670 async fn test_concurrent_file_processing() {
1671 let input = r"
1672%VERSION: 1.0
1673%STRUCT: User: [id, name]
1674---
1675users: @User
1676 | alice, Alice
1677 | bob, Bob
1678";
1679
1680 let tasks: Vec<_> = (0..5)
1682 .map(|_| {
1683 let input_clone = input.to_string();
1684 tokio::spawn(async move {
1685 let mut parser = AsyncStreamingParser::new(Cursor::new(input_clone))
1686 .await
1687 .unwrap();
1688
1689 let mut count = 0;
1690 while let Some(_event) = parser.next_event().await.unwrap() {
1691 count += 1;
1692 }
1693 count
1694 })
1695 })
1696 .collect();
1697
1698 let results = futures::future::join_all(tasks).await;
1699
1700 for result in results {
1702 assert_eq!(result.unwrap(), 5); }
1704 }
1705
1706 #[tokio::test]
1707 async fn test_concurrent_with_stream_trait() {
1708 use futures::StreamExt;
1709
1710 let input = r"
1711%VERSION: 1.0
1712%STRUCT: Data: [id]
1713---
1714data: @Data
1715 | row1
1716 | row2
1717 | row3
1718";
1719
1720 let mut counts = Vec::new();
1723
1724 for _ in 0..10 {
1725 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1726
1727 let count = parser
1729 .filter_map(|result| async move {
1730 result.ok().and_then(|event| {
1731 if let NodeEvent::Node(_) = event {
1732 Some(())
1733 } else {
1734 None
1735 }
1736 })
1737 })
1738 .count()
1739 .await;
1740
1741 counts.push(count);
1742 }
1743
1744 for count in counts {
1746 assert_eq!(count, 3);
1747 }
1748 }
1749
1750 #[tokio::test]
1753 async fn test_stream_trait_with_errors() {
1754 use futures::StreamExt;
1755
1756 let input = r"
1757%VERSION: 1.0
1758%STRUCT: User: [id, name]
1759---
1760users: @User
1761 | alice, Alice
1762 | bob
1763 | carol, Carol
1764";
1765 let parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1766
1767 let results: Vec<_> = parser.collect().await;
1768
1769 let errors: Vec<_> = results.iter().filter(|r| r.is_err()).collect();
1771 assert!(!errors.is_empty());
1772 }
1773
1774 #[tokio::test]
1775 async fn test_batch_with_mixed_events() {
1776 let input = r"
1777%VERSION: 1.0
1778%STRUCT: User: [id, name]
1779%STRUCT: Product: [id, title]
1780---
1781users: @User
1782 | alice, Alice
1783products: @Product
1784 | prod1, Widget
1785";
1786 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1787
1788 let batch = parser.next_batch(10).await.unwrap();
1789
1790 assert_eq!(batch.len(), 6);
1792
1793 let list_starts: Vec<_> = batch
1794 .iter()
1795 .filter(|e| matches!(e, NodeEvent::ListStart { .. }))
1796 .collect();
1797 assert_eq!(list_starts.len(), 2);
1798 }
1799
1800 #[tokio::test]
1801 async fn test_stream_empty_after_cancellation() {
1802 use tokio::sync::watch;
1803
1804 let input = r"
1805%VERSION: 1.0
1806%STRUCT: User: [id, name]
1807---
1808users: @User
1809 | alice, Alice
1810 | bob, Bob
1811";
1812 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1813
1814 let (cancel_tx, mut cancel_rx) = watch::channel(false);
1815
1816 let _event = parser.next_event_cancellable(&mut cancel_rx).await.unwrap();
1818
1819 cancel_tx.send(true).unwrap();
1821
1822 assert!(parser
1824 .next_event_cancellable(&mut cancel_rx)
1825 .await
1826 .unwrap()
1827 .is_none());
1828 assert!(parser
1829 .next_event_cancellable(&mut cancel_rx)
1830 .await
1831 .unwrap()
1832 .is_none());
1833 }
1834
1835 #[tokio::test]
1836 async fn test_batch_reading_performance() {
1837 let mut input = String::from(
1839 r"
1840%VERSION: 1.0
1841%STRUCT: Data: [id, value]
1842---
1843data: @Data
1844",
1845 );
1846 for i in 0..1000 {
1847 input.push_str(&format!(" | row{i}, value{i}\n"));
1848 }
1849
1850 let mut parser = AsyncStreamingParser::new(Cursor::new(input)).await.unwrap();
1851
1852 let start = std::time::Instant::now();
1853
1854 let mut total = 0;
1856 loop {
1857 let batch = parser.next_batch(100).await.unwrap();
1858 if batch.is_empty() {
1859 break;
1860 }
1861 total += batch.len();
1862 }
1863
1864 let elapsed = start.elapsed();
1865
1866 assert_eq!(total, 1002); assert!(elapsed.as_millis() < 100);
1871 }
1872}