1use crate::buffer_config::BufferSizeHint;
60use crate::buffer_pool::{BufferPool, MemoryLimits};
61use crate::error::{StreamError, StreamResult};
62use crate::event::{HeaderInfo, NodeEvent, NodeInfo};
63use crate::reader::LineReader;
64use hedl_core::lex::{calculate_indent, is_valid_key_token, is_valid_type_name};
65use hedl_core::Value;
66use std::io::Read;
67use std::time::{Duration, Instant};
68
69type ListContextResult = (String, Vec<String>, Option<(String, String)>);
71
72#[derive(Debug, Clone)]
116pub struct StreamingParserConfig {
117 pub max_line_length: usize,
124
125 pub max_indent_depth: usize,
133
134 pub buffer_size: usize,
141
142 pub timeout: Option<Duration>,
158
159 pub memory_limits: MemoryLimits,
166
167 pub enable_pooling: bool,
175}
176
177impl Default for StreamingParserConfig {
178 fn default() -> Self {
179 Self {
180 max_line_length: 1_000_000,
181 max_indent_depth: 100,
182 buffer_size: 64 * 1024,
183 timeout: None,
184 memory_limits: MemoryLimits::default(),
185 enable_pooling: false,
186 }
187 }
188}
189
190impl StreamingParserConfig {
191 #[must_use]
208 pub fn unlimited() -> Self {
209 Self {
210 max_line_length: usize::MAX,
211 ..Default::default()
212 }
213 }
214
215 #[must_use]
227 pub fn with_buffer_hint(mut self, hint: BufferSizeHint) -> Self {
228 self.buffer_size = hint.size();
229 self
230 }
231
232 #[must_use]
244 pub fn with_buffer_pooling(mut self, enabled: bool) -> Self {
245 self.enable_pooling = enabled;
246 self
247 }
248
249 #[must_use]
260 pub fn with_memory_limits(mut self, limits: MemoryLimits) -> Self {
261 self.memory_limits = limits;
262 self.max_line_length = limits.max_line_length;
264 self
265 }
266
267 #[must_use]
280 pub fn with_pool_size(mut self, size: usize) -> Self {
281 self.memory_limits.max_pool_size = size;
282 self
283 }
284}
285
286pub struct StreamingParser<R: Read> {
491 reader: LineReader<R>,
492 config: StreamingParserConfig,
493 header: Option<HeaderInfo>,
494 state: ParserState,
495 finished: bool,
496 errored: bool, sent_end_of_document: bool, start_time: Instant,
499 operations_count: usize, #[allow(dead_code)] buffer_pool: Option<BufferPool>, }
503
504#[derive(Debug)]
505struct ParserState {
506 stack: Vec<Context>,
508 prev_row: Option<Vec<Value>>,
510}
511
512#[derive(Debug, Clone)]
513enum Context {
514 Root,
515 Object {
516 #[allow(dead_code)]
517 key: String,
518 indent: usize,
519 },
520 List {
521 key: String,
522 type_name: String,
523 schema: Vec<String>,
524 row_indent: usize,
525 count: usize,
526 last_node: Option<(String, String)>, },
528}
529
530impl<R: Read> StreamingParser<R> {
531 pub fn new(reader: R) -> StreamResult<Self> {
601 Self::with_config(reader, StreamingParserConfig::default())
602 }
603
604 pub fn with_config(reader: R, config: StreamingParserConfig) -> StreamResult<Self> {
696 let buffer_pool = if config.enable_pooling && config.memory_limits.enable_buffer_pooling {
698 Some(BufferPool::new(config.memory_limits.max_pool_size))
699 } else {
700 None
701 };
702
703 let mut parser = Self {
704 reader: LineReader::with_capacity_and_max_length(
705 reader,
706 config.buffer_size,
707 config.max_line_length,
708 ),
709 config,
710 header: None,
711 state: ParserState {
712 stack: vec![Context::Root],
713 prev_row: None,
714 },
715 finished: false,
716 errored: false,
717 sent_end_of_document: false,
718 start_time: Instant::now(),
719 operations_count: 0,
720 buffer_pool,
721 };
722
723 parser.parse_header()?;
725
726 Ok(parser)
727 }
728
729 #[inline]
732 fn check_timeout(&self) -> StreamResult<()> {
733 if let Some(timeout) = self.config.timeout {
734 let elapsed = self.start_time.elapsed();
735 if elapsed > timeout {
736 return Err(StreamError::Timeout {
737 elapsed,
738 limit: timeout,
739 });
740 }
741 }
742 Ok(())
743 }
744
745 pub fn header(&self) -> Option<&HeaderInfo> {
825 self.header.as_ref()
826 }
827
828 fn parse_header(&mut self) -> StreamResult<()> {
830 let mut header = HeaderInfo::new();
831 let mut found_version = false;
832 let mut _found_separator = false;
833
834 while let Some((line_num, line)) = self.reader.next_line()? {
835 self.check_timeout()?;
837
838 let trimmed = line.trim();
839
840 if trimmed.is_empty() || trimmed.starts_with('#') {
842 continue;
843 }
844
845 if trimmed == "---" {
847 _found_separator = true;
848 break;
849 }
850
851 if trimmed.starts_with('%') {
853 self.parse_directive(trimmed, line_num, &mut header, &mut found_version)?;
854 } else {
855 self.reader.push_back(line_num, line);
857 break;
858 }
859 }
860
861 if !found_version {
862 return Err(StreamError::MissingVersion);
863 }
864
865 self.header = Some(header);
866 Ok(())
867 }
868
869 fn parse_directive(
870 &self,
871 line: &str,
872 line_num: usize,
873 header: &mut HeaderInfo,
874 found_version: &mut bool,
875 ) -> StreamResult<()> {
876 if line.starts_with("%VERSION") {
877 self.parse_version_directive(line, header, found_version)
878 } else if line.starts_with("%STRUCT") {
879 self.parse_struct_directive(line, line_num, header)
880 } else if line.starts_with("%ALIAS") {
881 self.parse_alias_directive(line, line_num, header)
882 } else if line.starts_with("%NEST") {
883 self.parse_nest_directive(line, line_num, header)
884 } else {
885 Ok(())
886 }
887 }
888
889 fn strip_inline_comment(text: &str) -> &str {
894 let mut in_quotes = false;
895 let mut in_brackets = 0;
896 let mut quote_char = '"';
897
898 for (i, c) in text.char_indices() {
899 match c {
900 '"' | '\'' if !in_quotes => {
901 in_quotes = true;
902 quote_char = c;
903 }
904 c if in_quotes && c == quote_char => {
905 in_quotes = false;
906 }
907 '[' if !in_quotes => in_brackets += 1,
908 ']' if !in_quotes && in_brackets > 0 => in_brackets -= 1,
909 '#' if !in_quotes && in_brackets == 0 => {
910 return text[..i].trim_end();
911 }
912 _ => {}
913 }
914 }
915 text
916 }
917
918 fn parse_version_directive(
919 &self,
920 line: &str,
921 header: &mut HeaderInfo,
922 found_version: &mut bool,
923 ) -> StreamResult<()> {
924 let line = Self::strip_inline_comment(line);
926 let rest = line.strip_prefix("%VERSION").expect("prefix exists").trim();
928 let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
930 let parts: Vec<&str> = rest.split('.').collect();
931
932 if parts.len() != 2 {
933 return Err(StreamError::InvalidVersion(rest.to_string()));
934 }
935
936 let major: u32 = parts[0]
937 .parse()
938 .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
939 let minor: u32 = parts[1]
940 .parse()
941 .map_err(|_| StreamError::InvalidVersion(rest.to_string()))?;
942
943 header.version = (major, minor);
944 *found_version = true;
945 Ok(())
946 }
947
948 fn parse_struct_directive(
949 &self,
950 line: &str,
951 line_num: usize,
952 header: &mut HeaderInfo,
953 ) -> StreamResult<()> {
954 let line = Self::strip_inline_comment(line);
956 let rest = line.strip_prefix("%STRUCT").expect("prefix exists").trim();
958 let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
960
961 let bracket_start = rest
962 .find('[')
963 .ok_or_else(|| StreamError::syntax(line_num, "missing '[' in %STRUCT"))?;
964 let bracket_end = rest
965 .find(']')
966 .ok_or_else(|| StreamError::syntax(line_num, "missing ']' in %STRUCT"))?;
967
968 let type_part = rest[..bracket_start].trim().trim_end_matches(':').trim();
971 let type_name = if let Some(paren_pos) = type_part.find('(') {
973 type_part[..paren_pos].trim()
974 } else {
975 type_part
976 };
977 if !is_valid_type_name(type_name) {
978 return Err(StreamError::syntax(
979 line_num,
980 format!("invalid type name: {type_name}"),
981 ));
982 }
983
984 let cols_str = &rest[bracket_start + 1..bracket_end];
985 let columns: Vec<String> = cols_str
986 .split(',')
987 .map(|s| s.trim().to_string())
988 .filter(|s| !s.is_empty())
989 .collect();
990
991 if columns.is_empty() {
992 return Err(StreamError::syntax(line_num, "empty schema"));
993 }
994
995 header.structs.insert(type_name.to_string(), columns);
996 Ok(())
997 }
998
999 fn parse_alias_directive(
1000 &self,
1001 line: &str,
1002 line_num: usize,
1003 header: &mut HeaderInfo,
1004 ) -> StreamResult<()> {
1005 let line = Self::strip_inline_comment(line);
1007 let rest = line.strip_prefix("%ALIAS").expect("prefix exists").trim();
1009 let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
1011
1012 let sep_pos = rest
1014 .find('=')
1015 .or_else(|| rest.find(':'))
1016 .ok_or_else(|| StreamError::syntax(line_num, "missing '=' or ':' in %ALIAS"))?;
1017
1018 let alias = rest[..sep_pos].trim();
1019 let value = rest[sep_pos + 1..].trim().trim_matches('"');
1020
1021 header.aliases.insert(alias.to_string(), value.to_string());
1022 Ok(())
1023 }
1024
1025 fn parse_nest_directive(
1026 &self,
1027 line: &str,
1028 line_num: usize,
1029 header: &mut HeaderInfo,
1030 ) -> StreamResult<()> {
1031 let line = Self::strip_inline_comment(line);
1033 let rest = line.strip_prefix("%NEST").expect("prefix exists").trim();
1035 let rest = rest.strip_prefix(':').unwrap_or(rest).trim();
1037
1038 let arrow_pos = rest
1039 .find('>')
1040 .ok_or_else(|| StreamError::syntax(line_num, "missing '>' in %NEST"))?;
1041
1042 let parent = rest[..arrow_pos].trim();
1043 let child = rest[arrow_pos + 1..].trim();
1044
1045 if !is_valid_type_name(parent) || !is_valid_type_name(child) {
1046 return Err(StreamError::syntax(line_num, "invalid type name in %NEST"));
1047 }
1048
1049 header.nests.insert(parent.to_string(), child.to_string());
1050 Ok(())
1051 }
1052
1053 fn next_event(&mut self) -> StreamResult<Option<NodeEvent>> {
1055 if self.errored {
1057 return Ok(None);
1058 }
1059 if self.finished {
1061 return self.finalize();
1062 }
1063
1064 loop {
1065 self.operations_count += 1;
1067 if self.operations_count % 100 == 0 {
1068 self.check_timeout()?;
1069 }
1070
1071 let (line_num, line) = if let Some(l) = self.reader.next_line()? {
1072 l
1073 } else {
1074 self.finished = true;
1075 return self.finalize();
1077 };
1078
1079 let trimmed = line.trim();
1080
1081 if trimmed.is_empty() || trimmed.starts_with('#') {
1083 continue;
1084 }
1085
1086 let indent_info = calculate_indent(&line, line_num as u32)
1088 .map_err(|e| StreamError::syntax(line_num, e.to_string()))?;
1089
1090 let (indent, content) = match indent_info {
1091 Some(info) => (info.level, &line[info.spaces..]),
1092 None => continue,
1093 };
1094
1095 if indent > self.config.max_indent_depth {
1096 return Err(StreamError::syntax(
1097 line_num,
1098 format!("indent depth {indent} exceeds limit"),
1099 ));
1100 }
1101
1102 let events = self.pop_contexts(indent)?;
1104 if let Some(event) = events {
1105 self.reader.push_back(line_num, line);
1107 return Ok(Some(event));
1108 }
1109
1110 return self.parse_line(content, indent, line_num);
1112 }
1113 }
1114
1115 fn pop_contexts(&mut self, current_indent: usize) -> StreamResult<Option<NodeEvent>> {
1116 while self.state.stack.len() > 1 {
1117 let should_pop = match self.state.stack.last().expect("stack has elements") {
1119 Context::Root => false,
1120 Context::Object { indent, .. } => current_indent <= *indent,
1121 Context::List { row_indent, .. } => current_indent < *row_indent,
1122 };
1123
1124 if should_pop {
1125 let ctx = self.state.stack.pop().expect("stack has elements");
1127 match ctx {
1128 Context::List {
1129 key,
1130 type_name,
1131 count,
1132 ..
1133 } => {
1134 return Ok(Some(NodeEvent::ListEnd {
1135 key,
1136 type_name,
1137 count,
1138 }));
1139 }
1140 Context::Object { key, .. } => {
1141 return Ok(Some(NodeEvent::ObjectEnd { key }));
1142 }
1143 Context::Root => {
1144 }
1146 }
1147 } else {
1148 break;
1149 }
1150 }
1151
1152 Ok(None)
1153 }
1154
1155 fn parse_line(
1156 &mut self,
1157 content: &str,
1158 indent: usize,
1159 line_num: usize,
1160 ) -> StreamResult<Option<NodeEvent>> {
1161 let content = strip_comment(content);
1163
1164 if let Some(row_content) = content.strip_prefix('|') {
1165 self.parse_matrix_row(row_content, indent, line_num)
1167 } else if let Some(colon_pos) = content.find(':') {
1168 let key = content[..colon_pos].trim();
1169 let after_colon = &content[colon_pos + 1..];
1170
1171 if !is_valid_key_token(key) {
1172 return Err(StreamError::syntax(line_num, format!("invalid key: {key}")));
1173 }
1174
1175 let after_colon_trimmed = after_colon.trim();
1176
1177 if after_colon_trimmed.is_empty() {
1178 self.validate_indent_for_key_value(indent, line_num)?;
1180
1181 self.state.stack.push(Context::Object {
1182 key: key.to_string(),
1183 indent,
1184 });
1185 Ok(Some(NodeEvent::ObjectStart {
1186 key: key.to_string(),
1187 line: line_num,
1188 }))
1189 } else if after_colon_trimmed.starts_with('@')
1190 && self.is_list_start(after_colon_trimmed)
1191 {
1192 if !after_colon.starts_with(' ') {
1194 return Err(StreamError::syntax(
1195 line_num,
1196 "space required after ':' before '@'",
1197 ));
1198 }
1199
1200 let (type_name, schema) = self.parse_list_start(after_colon_trimmed, line_num)?;
1204
1205 self.state.stack.push(Context::List {
1206 key: key.to_string(),
1207 type_name: type_name.clone(),
1208 schema: schema.clone(),
1209 row_indent: indent + 1,
1210 count: 0,
1211 last_node: None,
1212 });
1213
1214 self.state.prev_row = None;
1215
1216 Ok(Some(NodeEvent::ListStart {
1217 key: key.to_string(),
1218 type_name,
1219 schema,
1220 line: line_num,
1221 }))
1222 } else {
1223 if !after_colon.starts_with(' ') {
1225 return Err(StreamError::syntax(
1226 line_num,
1227 "space required after ':' in key-value",
1228 ));
1229 }
1230 self.validate_indent_for_key_value(indent, line_num)?;
1231
1232 let value = self.infer_value(after_colon.trim(), line_num)?;
1233 Ok(Some(NodeEvent::Scalar {
1234 key: key.to_string(),
1235 value,
1236 line: line_num,
1237 }))
1238 }
1239 } else {
1240 Err(StreamError::syntax(line_num, "expected ':' in line"))
1241 }
1242 }
1243
1244 fn validate_indent_for_key_value(&self, indent: usize, line_num: usize) -> StreamResult<()> {
1251 let expected = match self.state.stack.last() {
1252 Some(Context::Root) | None => 0,
1253 Some(Context::Object {
1254 indent: parent_indent,
1255 ..
1256 }) => parent_indent + 1,
1257 Some(Context::List { .. }) => {
1258 return Err(StreamError::syntax(
1259 line_num,
1260 "cannot add key-value inside list context",
1261 ));
1262 }
1263 };
1264
1265 if indent != expected {
1266 return Err(StreamError::syntax(
1267 line_num,
1268 format!("expected indent level {expected}, got {indent}"),
1269 ));
1270 }
1271
1272 Ok(())
1273 }
1274
1275 #[inline]
1276 fn is_list_start(&self, s: &str) -> bool {
1277 let s = s.trim();
1278 if !s.starts_with('@') {
1279 return false;
1280 }
1281 let rest = &s[1..];
1282 let type_end = rest
1283 .find(|c: char| c == '[' || c.is_whitespace())
1284 .unwrap_or(rest.len());
1285 let type_name = &rest[..type_end];
1286 is_valid_type_name(type_name)
1287 }
1288
1289 fn parse_list_start(&self, s: &str, line_num: usize) -> StreamResult<(String, Vec<String>)> {
1290 let s = s.trim();
1291 let rest = &s[1..]; if let Some(bracket_pos) = rest.find('[') {
1294 let type_name = &rest[..bracket_pos];
1296 if !is_valid_type_name(type_name) {
1297 return Err(StreamError::syntax(
1298 line_num,
1299 format!("invalid type name: {type_name}"),
1300 ));
1301 }
1302
1303 let bracket_end = rest
1304 .find(']')
1305 .ok_or_else(|| StreamError::syntax(line_num, "missing ']'"))?;
1306
1307 let cols_str = &rest[bracket_pos + 1..bracket_end];
1308 let mut columns = Vec::new();
1309
1310 for part in cols_str.split(',') {
1311 let col = part.trim();
1312 if col.is_empty() {
1313 continue;
1314 }
1315 if !is_valid_key_token(col) {
1317 return Err(StreamError::syntax(
1318 line_num,
1319 format!("invalid column name: {col}"),
1320 ));
1321 }
1322 columns.push(col.to_string());
1323 }
1324
1325 if columns.is_empty() {
1327 return Err(StreamError::syntax(line_num, "empty inline schema"));
1328 }
1329
1330 if let Some(header) = &self.header {
1332 if let Some(declared) = header.structs.get(type_name) {
1333 if declared != &columns {
1334 return Err(StreamError::schema(
1335 line_num,
1336 format!(
1337 "inline schema for '{type_name}' doesn't match declared schema"
1338 ),
1339 ));
1340 }
1341 }
1342 }
1343
1344 Ok((type_name.to_string(), columns))
1345 } else {
1346 let type_name = rest.trim();
1348 if !is_valid_type_name(type_name) {
1349 return Err(StreamError::syntax(
1350 line_num,
1351 format!("invalid type name: {type_name}"),
1352 ));
1353 }
1354
1355 let header = self
1356 .header
1357 .as_ref()
1358 .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
1359
1360 let schema = header.structs.get(type_name).ok_or_else(|| {
1361 StreamError::schema(line_num, format!("undefined type: {type_name}"))
1362 })?;
1363
1364 Ok((type_name.to_string(), schema.clone()))
1365 }
1366 }
1367
1368 fn parse_matrix_row(
1369 &mut self,
1370 content: &str,
1371 indent: usize,
1372 line_num: usize,
1373 ) -> StreamResult<Option<NodeEvent>> {
1374 let (child_count, csv_content) = self.parse_row_prefix(content, line_num)?;
1376 let content = strip_comment(csv_content).trim();
1377
1378 let (type_name, schema, parent_info) = self.find_list_context(indent, line_num)?;
1380
1381 let fields = hedl_core::lex::parse_csv_row(content)
1384 .map_err(|e| StreamError::syntax(line_num, format!("row parse error: {e}")))?;
1385
1386 if fields.len() != schema.len() {
1388 return Err(StreamError::ShapeMismatch {
1389 line: line_num,
1390 expected: schema.len(),
1391 got: fields.len(),
1392 });
1393 }
1394
1395 let mut values = Vec::with_capacity(fields.len());
1397 for (col_idx, field) in fields.iter().enumerate() {
1398 let value = if field.value == "^" {
1399 self.state
1401 .prev_row
1402 .as_ref()
1403 .and_then(|prev| prev.get(col_idx).cloned())
1404 .unwrap_or(Value::Null)
1405 } else if field.is_quoted {
1406 Value::String(field.value.clone().into())
1407 } else {
1408 self.infer_value(&field.value, line_num)?
1409 };
1410 values.push(value);
1411 }
1412
1413 let id = match &values[0] {
1415 Value::String(s) => s.to_string(),
1416 _ => return Err(StreamError::syntax(line_num, "ID column must be a string")),
1417 };
1418
1419 self.update_list_context(&type_name, &id);
1421 self.state.prev_row = Some(values.clone());
1422
1423 let depth = self
1425 .state
1426 .stack
1427 .iter()
1428 .filter(|ctx| matches!(ctx, Context::List { .. }))
1429 .count()
1430 .saturating_sub(1);
1431
1432 let mut node = NodeInfo::new(type_name.clone(), id, values, depth, line_num);
1434
1435 if let Some((parent_type, parent_id)) = parent_info {
1436 node = node.with_parent(parent_type, parent_id);
1437 }
1438
1439 if let Some(count) = child_count {
1440 node = node.with_child_count(count);
1441 }
1442
1443 Ok(Some(NodeEvent::Node(node)))
1444 }
1445
1446 fn parse_row_prefix<'a>(
1451 &self,
1452 content: &'a str,
1453 _line_num: usize,
1454 ) -> StreamResult<(Option<usize>, &'a str)> {
1455 if content.starts_with('[') {
1458 if let Some(bracket_end) = content.find(']') {
1459 let count_str = &content[1..bracket_end];
1460 if let Ok(count) = count_str.parse::<usize>() {
1461 let data = content[bracket_end + 1..].trim_start();
1464 return Ok((Some(count), data));
1465 }
1466 }
1468 }
1469
1470 Ok((None, content))
1472 }
1473
1474 fn find_list_context(
1475 &mut self,
1476 indent: usize,
1477 line_num: usize,
1478 ) -> StreamResult<ListContextResult> {
1479 let header = self
1480 .header
1481 .as_ref()
1482 .ok_or_else(|| StreamError::Header("header not parsed".to_string()))?;
1483
1484 for ctx in self.state.stack.iter().rev() {
1485 if let Context::List {
1486 type_name,
1487 schema,
1488 row_indent,
1489 last_node,
1490 ..
1491 } = ctx
1492 {
1493 if indent == *row_indent {
1494 return Ok((type_name.clone(), schema.clone(), None));
1496 } else if indent == *row_indent + 1 {
1497 let parent_info = last_node.clone().ok_or_else(|| {
1499 StreamError::orphan_row(line_num, "child row has no parent")
1500 })?;
1501
1502 let child_type = header.nests.get(type_name).ok_or_else(|| {
1503 StreamError::orphan_row(
1504 line_num,
1505 format!("no NEST rule for parent type '{type_name}'"),
1506 )
1507 })?;
1508
1509 let child_schema = header.structs.get(child_type).ok_or_else(|| {
1510 StreamError::schema(
1511 line_num,
1512 format!("child type '{child_type}' not defined"),
1513 )
1514 })?;
1515
1516 self.state.stack.push(Context::List {
1518 key: child_type.clone(),
1519 type_name: child_type.clone(),
1520 schema: child_schema.clone(),
1521 row_indent: indent,
1522 count: 0,
1523 last_node: None,
1524 });
1525
1526 return Ok((child_type.clone(), child_schema.clone(), Some(parent_info)));
1527 }
1528 }
1529 }
1530
1531 Err(StreamError::syntax(
1532 line_num,
1533 "matrix row outside of list context",
1534 ))
1535 }
1536
1537 fn update_list_context(&mut self, type_name: &str, id: &str) {
1538 for ctx in self.state.stack.iter_mut().rev() {
1539 if let Context::List {
1540 type_name: ctx_type,
1541 last_node,
1542 count,
1543 ..
1544 } = ctx
1545 {
1546 if ctx_type == type_name {
1547 *last_node = Some((type_name.to_string(), id.to_string()));
1548 *count += 1;
1549 break;
1550 }
1551 }
1552 }
1553 }
1554
1555 #[inline]
1556 fn infer_value(&self, s: &str, _line_num: usize) -> StreamResult<Value> {
1557 let s = s.trim();
1558
1559 if s.is_empty() || s == "~" || s == "null" {
1561 return Ok(Value::Null);
1562 }
1563
1564 if s == "true" {
1565 return Ok(Value::Bool(true));
1566 }
1567 if s == "false" {
1568 return Ok(Value::Bool(false));
1569 }
1570
1571 if let Some(ref_part) = s.strip_prefix('@') {
1573 if let Some(colon_pos) = ref_part.find(':') {
1574 let type_name = &ref_part[..colon_pos];
1575 let id = &ref_part[colon_pos + 1..];
1576 return Ok(Value::Reference(hedl_core::Reference {
1577 type_name: Some(type_name.to_string().into()),
1578 id: id.to_string().into(),
1579 }));
1580 }
1581 return Ok(Value::Reference(hedl_core::Reference {
1582 type_name: None,
1583 id: ref_part.to_string().into(),
1584 }));
1585 }
1586
1587 if let Some(alias) = s.strip_prefix('$') {
1589 if let Some(header) = &self.header {
1590 if let Some(value) = header.aliases.get(alias) {
1591 return Ok(Value::String(value.clone().into()));
1592 }
1593 }
1594 return Ok(Value::String(s.to_string().into()));
1595 }
1596
1597 if let Ok(i) = s.parse::<i64>() {
1599 return Ok(Value::Int(i));
1600 }
1601 if let Ok(f) = s.parse::<f64>() {
1602 return Ok(Value::Float(f));
1603 }
1604
1605 Ok(Value::String(s.to_string().into()))
1607 }
1608
1609 fn finalize(&mut self) -> StreamResult<Option<NodeEvent>> {
1610 if self.sent_end_of_document {
1612 return Ok(None);
1613 }
1614
1615 while self.state.stack.len() > 1 {
1617 let ctx = self.state.stack.pop().expect("stack has elements");
1619 match ctx {
1620 Context::List {
1621 key,
1622 type_name,
1623 count,
1624 ..
1625 } => {
1626 return Ok(Some(NodeEvent::ListEnd {
1627 key,
1628 type_name,
1629 count,
1630 }));
1631 }
1632 Context::Object { key, .. } => {
1633 return Ok(Some(NodeEvent::ObjectEnd { key }));
1634 }
1635 Context::Root => {
1636 }
1638 }
1639 }
1640
1641 self.sent_end_of_document = true;
1643 Ok(Some(NodeEvent::EndOfDocument))
1644 }
1645}
1646
1647impl<R: Read> Iterator for StreamingParser<R> {
1648 type Item = StreamResult<NodeEvent>;
1649
1650 fn next(&mut self) -> Option<Self::Item> {
1651 match self.next_event() {
1652 Ok(Some(NodeEvent::EndOfDocument)) => None,
1653 Ok(Some(event)) => Some(Ok(event)),
1654 Ok(None) => None,
1655 Err(e) => {
1656 self.finished = true;
1658 self.errored = true;
1659 Some(Err(e))
1660 }
1661 }
1662 }
1663}
1664
1665#[cfg(feature = "compression")]
1667impl StreamingParser<crate::compression::CompressionReader<std::fs::File>> {
1668 pub fn open<P: AsRef<std::path::Path>>(path: P) -> StreamResult<Self> {
1695 Self::open_with_config(path, StreamingParserConfig::default())
1696 }
1697
1698 pub fn open_with_config<P: AsRef<std::path::Path>>(
1719 path: P,
1720 config: StreamingParserConfig,
1721 ) -> StreamResult<Self> {
1722 use crate::compression::{CompressionFormat, CompressionReader};
1723
1724 let path = path.as_ref();
1725 let format = CompressionFormat::from_path(path);
1726
1727 let file = std::fs::File::open(path).map_err(StreamError::Io)?;
1728 let reader = CompressionReader::with_format(file, format).map_err(StreamError::Io)?;
1729
1730 Self::with_config(reader, config)
1731 }
1732
1733 pub fn open_with_compression<P: AsRef<std::path::Path>>(
1754 path: P,
1755 format: crate::compression::CompressionFormat,
1756 ) -> StreamResult<Self> {
1757 use crate::compression::CompressionReader;
1758
1759 let file = std::fs::File::open(path).map_err(StreamError::Io)?;
1760 let reader = CompressionReader::with_format(file, format).map_err(StreamError::Io)?;
1761
1762 Self::new(reader)
1763 }
1764}
1765
1766mod simd_comment {
1772 #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1773 use std::arch::x86_64::{
1774 __m256i, _mm256_cmpeq_epi8, _mm256_loadu_si256, _mm256_movemask_epi8, _mm256_set1_epi8,
1775 };
1776
1777 #[inline]
1787 #[allow(unsafe_code)] pub fn find_hash_simd(s: &[u8]) -> Option<usize> {
1789 #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1790 {
1791 if is_x86_feature_detected!("avx2") {
1793 return unsafe { find_hash_avx2(s) };
1795 }
1796 }
1797
1798 find_hash_scalar(s)
1800 }
1801
1802 #[cfg(all(target_arch = "x86_64", feature = "avx2"))]
1806 #[target_feature(enable = "avx2")]
1807 unsafe fn find_hash_avx2(s: &[u8]) -> Option<usize> {
1808 const CHUNK_SIZE: usize = 32;
1809 let len = s.len();
1810
1811 if len == 0 {
1812 return None;
1813 }
1814
1815 let hash_vec = _mm256_set1_epi8(b'#' as i8);
1816 let mut offset = 0;
1817
1818 while offset + CHUNK_SIZE <= len {
1820 let chunk = _mm256_loadu_si256(s.as_ptr().add(offset).cast::<__m256i>());
1822
1823 let matches = _mm256_cmpeq_epi8(chunk, hash_vec);
1825
1826 let mask = _mm256_movemask_epi8(matches);
1828
1829 if mask != 0 {
1830 let bit_pos = mask.trailing_zeros() as usize;
1832 return Some(offset + bit_pos);
1833 }
1834
1835 offset += CHUNK_SIZE;
1836 }
1837
1838 find_hash_scalar(&s[offset..]).map(|pos| offset + pos)
1840 }
1841
1842 #[inline]
1844 fn find_hash_scalar(s: &[u8]) -> Option<usize> {
1845 s.iter().position(|&b| b == b'#')
1846 }
1847
1848 #[cfg(test)]
1849 mod tests {
1850 use super::*;
1851
1852 #[test]
1853 fn test_find_hash_basic() {
1854 assert_eq!(find_hash_simd(b"hello # world"), Some(6));
1855 assert_eq!(find_hash_simd(b"no comment"), None);
1856 assert_eq!(find_hash_simd(b"#start"), Some(0));
1857 assert_eq!(find_hash_simd(b"end#"), Some(3));
1858 }
1859
1860 #[test]
1861 fn test_find_hash_long() {
1862 let long = b"a".repeat(100);
1864 assert_eq!(find_hash_simd(&long), None);
1865
1866 let mut with_hash = b"a".repeat(50);
1867 with_hash.push(b'#');
1868 with_hash.extend_from_slice(&b"a".repeat(50));
1869 assert_eq!(find_hash_simd(&with_hash), Some(50));
1870 }
1871
1872 #[test]
1873 fn test_find_hash_edge_cases() {
1874 assert_eq!(find_hash_simd(b""), None);
1875 assert_eq!(find_hash_simd(b"#"), Some(0));
1876 assert_eq!(find_hash_simd(b"##"), Some(0));
1877 }
1878
1879 #[test]
1880 fn test_find_hash_alignment() {
1881 for offset in 0..32 {
1883 let mut data = vec![b'a'; offset];
1884 data.push(b'#');
1885 data.extend_from_slice(&[b'b'; 32]);
1886 assert_eq!(find_hash_simd(&data), Some(offset));
1887 }
1888 }
1889
1890 #[test]
1891 fn test_find_hash_multiple() {
1892 assert_eq!(find_hash_simd(b"# # #"), Some(0));
1893 assert_eq!(find_hash_simd(b"a # # #"), Some(2));
1894 }
1895 }
1896}
1897
1898#[inline]
1912pub(crate) fn strip_comment(s: &str) -> &str {
1913 let bytes = s.as_bytes();
1915 let mut in_quotes = false;
1916 let mut escape = false;
1917 let mut search_start = 0;
1918
1919 loop {
1920 let hash_pos = match simd_comment::find_hash_simd(&bytes[search_start..]) {
1922 Some(pos) => search_start + pos,
1923 None => return s, };
1925
1926 for &c in &bytes[search_start..hash_pos] {
1929 if escape {
1930 escape = false;
1931 continue;
1932 }
1933
1934 match c {
1935 b'\\' => escape = true,
1936 b'"' => in_quotes = !in_quotes,
1937 _ => {}
1938 }
1939 }
1940
1941 if escape {
1943 escape = false;
1945 search_start = hash_pos + 1;
1946 continue;
1947 }
1948
1949 if !in_quotes {
1950 return s[..hash_pos].trim_end();
1952 }
1953
1954 search_start = hash_pos + 1;
1956 }
1957}
1958
1959#[cfg(test)]
1960mod tests {
1961 use super::*;
1962 use std::io::Cursor;
1963
1964 #[test]
1967 fn test_parse_header() {
1968 let input = r#"
1969%VERSION: 1.0
1970%STRUCT: User: [id, name, email]
1971%ALIAS active = "Active"
1972%NEST: User > Order
1973---
1974"#;
1975 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
1976 let header = parser.header().unwrap();
1977
1978 assert_eq!(header.version, (1, 0));
1979 assert_eq!(
1980 header.structs.get("User"),
1981 Some(&vec![
1982 "id".to_string(),
1983 "name".to_string(),
1984 "email".to_string()
1985 ])
1986 );
1987 assert_eq!(header.aliases.get("active"), Some(&"Active".to_string()));
1988 assert_eq!(header.nests.get("User"), Some(&"Order".to_string()));
1989 }
1990
1991 #[test]
1992 fn test_header_missing_version() {
1993 let input = r"
1994%STRUCT: User: [id, name]
1995---
1996";
1997 let result = StreamingParser::new(Cursor::new(input));
1998 assert!(matches!(result, Err(StreamError::MissingVersion)));
1999 }
2000
2001 #[test]
2002 fn test_header_invalid_version_format() {
2003 let input = r"
2004%VERSION abc
2005---
2006";
2007 let result = StreamingParser::new(Cursor::new(input));
2008 assert!(matches!(result, Err(StreamError::InvalidVersion(_))));
2009 }
2010
2011 #[test]
2012 fn test_header_version_single_number() {
2013 let input = r"
2014%VERSION 1
2015---
2016";
2017 let result = StreamingParser::new(Cursor::new(input));
2018 assert!(matches!(result, Err(StreamError::InvalidVersion(_))));
2019 }
2020
2021 #[test]
2022 fn test_header_multiple_schemas() {
2023 let input = r"
2024%VERSION: 1.0
2025%STRUCT: User: [id, name]
2026%STRUCT: Product: [id, title, price]
2027%STRUCT: Order: [id, user_id, product_id]
2028---
2029";
2030 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2031 let header = parser.header().unwrap();
2032
2033 assert_eq!(header.structs.len(), 3);
2034 assert!(header.structs.contains_key("User"));
2035 assert!(header.structs.contains_key("Product"));
2036 assert!(header.structs.contains_key("Order"));
2037 }
2038
2039 #[test]
2040 fn test_header_struct_missing_bracket() {
2041 let input = r"
2042%VERSION: 1.0
2043%STRUCT User id, name
2044---
2045";
2046 let result = StreamingParser::new(Cursor::new(input));
2047 assert!(matches!(result, Err(StreamError::Syntax { .. })));
2048 }
2049
2050 #[test]
2051 fn test_header_empty_struct() {
2052 let input = r"
2053%VERSION: 1.0
2054%STRUCT: User: []
2055---
2056";
2057 let result = StreamingParser::new(Cursor::new(input));
2058 assert!(matches!(result, Err(StreamError::Syntax { .. })));
2059 }
2060
2061 #[test]
2062 fn test_header_alias_missing_equals() {
2063 let input = r#"
2064%VERSION: 1.0
2065%ALIAS foo "bar"
2066---
2067"#;
2068 let result = StreamingParser::new(Cursor::new(input));
2069 assert!(matches!(result, Err(StreamError::Syntax { .. })));
2070 }
2071
2072 #[test]
2073 fn test_header_nest_missing_arrow() {
2074 let input = r"
2075%VERSION: 1.0
2076%NEST Parent Child
2077---
2078";
2079 let result = StreamingParser::new(Cursor::new(input));
2080 assert!(matches!(result, Err(StreamError::Syntax { .. })));
2081 }
2082
2083 #[test]
2084 fn test_header_with_comments() {
2085 let input = r"
2086%VERSION: 1.0
2087# This is a comment
2088%STRUCT: User: [id, name] # inline comment
2089---
2090";
2091 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2092 let header = parser.header().unwrap();
2093 assert!(header.structs.contains_key("User"));
2094 }
2095
2096 #[test]
2097 fn test_header_blank_lines() {
2098 let input = r"
2099%VERSION: 1.0
2100
2101%STRUCT: User: [id, name]
2102
2103---
2104";
2105 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2106 let header = parser.header().unwrap();
2107 assert!(header.structs.contains_key("User"));
2108 }
2109
2110 #[test]
2113 fn test_streaming_nodes() {
2114 let input = r"
2115%VERSION: 1.0
2116%STRUCT: User: [id, name]
2117---
2118users: @User
2119 | alice, Alice Smith
2120 | bob, Bob Jones
2121";
2122 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2123
2124 let events: Vec<_> = parser.collect();
2125 for event in &events {
2126 if let Err(e) = event {
2127 eprintln!("Error: {e:?}");
2128 }
2129 }
2130 assert!(events.iter().all(std::result::Result::is_ok));
2131
2132 let nodes: Vec<_> = events
2133 .iter()
2134 .filter_map(|e| e.as_ref().ok())
2135 .filter_map(|e| e.as_node())
2136 .collect();
2137
2138 assert_eq!(nodes.len(), 2);
2139 assert_eq!(nodes[0].id, "alice");
2140 assert_eq!(nodes[1].id, "bob");
2141 }
2142
2143 #[test]
2144 fn test_streaming_empty_body() {
2145 let input = r"
2146%VERSION: 1.0
2147%STRUCT: User: [id, name]
2148---
2149";
2150 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2151 let events: Vec<_> = parser.collect();
2152 assert!(events.is_empty());
2153 }
2154
2155 #[test]
2156 fn test_streaming_list_start_end_events() {
2157 let input = r"
2158%VERSION: 1.0
2159%STRUCT: User: [id, name]
2160---
2161users: @User
2162 | alice, Alice
2163";
2164 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2165 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2166
2167 let list_starts: Vec<_> = events
2168 .iter()
2169 .filter(|e| matches!(e, NodeEvent::ListStart { .. }))
2170 .collect();
2171 let list_ends: Vec<_> = events
2172 .iter()
2173 .filter(|e| matches!(e, NodeEvent::ListEnd { .. }))
2174 .collect();
2175
2176 assert_eq!(list_starts.len(), 1);
2177 assert_eq!(list_ends.len(), 1);
2178
2179 if let NodeEvent::ListStart { key, type_name, .. } = &list_starts[0] {
2180 assert_eq!(key, "users");
2181 assert_eq!(type_name, "User");
2182 }
2183
2184 if let NodeEvent::ListEnd {
2185 type_name, count, ..
2186 } = &list_ends[0]
2187 {
2188 assert_eq!(type_name, "User");
2189 assert_eq!(*count, 1);
2190 }
2191 }
2192
2193 #[test]
2196 fn test_matrix_row_empty_fields() {
2197 let input = r"
2201%VERSION: 1.0
2202%STRUCT: Data: [id, optional, required]
2203---
2204data: @Data
2205 | row1, ~, value
2206";
2207 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2208 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2209 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2210
2211 assert_eq!(nodes.len(), 1);
2212 assert_eq!(nodes[0].id, "row1");
2213 assert_eq!(nodes[0].fields[1], Value::Null);
2214 }
2215
2216 #[test]
2217 fn test_matrix_row_quoted_fields() {
2218 let input = r#"
2219%VERSION: 1.0
2220%STRUCT: Data: [id, description]
2221---
2222data: @Data
2223 | row1, "Hello, World"
2224"#;
2225 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2226 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2227 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2228
2229 assert_eq!(nodes.len(), 1);
2230 assert_eq!(
2231 nodes[0].fields[1],
2232 Value::String("Hello, World".to_string().into())
2233 );
2234 }
2235
2236 #[test]
2237 fn test_matrix_row_shape_mismatch() {
2238 let input = r"
2239%VERSION: 1.0
2240%STRUCT: User: [id, name, email]
2241---
2242users: @User
2243 | alice, Alice
2244";
2245 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2246 let events: Vec<_> = parser.collect();
2247 let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2248
2249 assert!(!errors.is_empty());
2250 if let Err(StreamError::ShapeMismatch { expected, got, .. }) = &errors[0] {
2251 assert_eq!(*expected, 3);
2252 assert_eq!(*got, 2);
2253 }
2254 }
2255
2256 #[test]
2257 fn test_matrix_row_references() {
2258 let input = r"
2259%VERSION: 1.0
2260%STRUCT: Order: [id, user]
2261---
2262orders: @Order
2263 | order1, @User:alice
2264 | order2, @bob
2265";
2266 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2267 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2268 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2269
2270 assert_eq!(nodes.len(), 2);
2271
2272 if let Value::Reference(r) = &nodes[0].fields[1] {
2273 assert_eq!(r.type_name.as_deref(), Some("User"));
2274 assert_eq!(&*r.id, "alice");
2275 } else {
2276 panic!("Expected reference");
2277 }
2278
2279 if let Value::Reference(r) = &nodes[1].fields[1] {
2280 assert_eq!(r.type_name, None);
2281 assert_eq!(&*r.id, "bob");
2282 } else {
2283 panic!("Expected reference");
2284 }
2285 }
2286
2287 #[test]
2288 fn test_matrix_row_booleans() {
2289 let input = r"
2290%VERSION: 1.0
2291%STRUCT: Flag: [id, active, verified]
2292---
2293flags: @Flag
2294 | flag1, true, false
2295";
2296 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2297 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2298 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2299
2300 assert_eq!(nodes.len(), 1);
2301 assert_eq!(nodes[0].fields[1], Value::Bool(true));
2302 assert_eq!(nodes[0].fields[2], Value::Bool(false));
2303 }
2304
2305 #[test]
2306 fn test_matrix_row_numbers() {
2307 let input = r"
2308%VERSION: 1.0
2309%STRUCT: Data: [id, int_val, float_val]
2310---
2311data: @Data
2312 | row1, 42, 3.5
2313 | row2, -100, -2.5
2314";
2315 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2316 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2317 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2318
2319 assert_eq!(nodes.len(), 2);
2320 assert_eq!(nodes[0].fields[1], Value::Int(42));
2321 assert_eq!(nodes[0].fields[2], Value::Float(3.5));
2322 assert_eq!(nodes[1].fields[1], Value::Int(-100));
2323 assert_eq!(nodes[1].fields[2], Value::Float(-2.5));
2324 }
2325
2326 #[test]
2327 fn test_matrix_row_null() {
2328 let input = r"
2329%VERSION: 1.0
2330%STRUCT: Data: [id, nullable]
2331---
2332data: @Data
2333 | row1, ~
2334";
2335 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2336 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2337 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2338
2339 assert_eq!(nodes.len(), 1);
2340 assert_eq!(nodes[0].fields[1], Value::Null);
2341 }
2342
2343 #[test]
2344 fn test_matrix_row_ditto() {
2345 let input = r"
2346%VERSION: 1.0
2347%STRUCT: Data: [id, category]
2348---
2349data: @Data
2350 | row1, CategoryA
2351 | row2, ^
2352 | row3, ^
2353";
2354 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2355 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2356 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2357
2358 assert_eq!(nodes.len(), 3);
2359 assert_eq!(
2360 nodes[0].fields[1],
2361 Value::String("CategoryA".to_string().into())
2362 );
2363 assert_eq!(
2364 nodes[1].fields[1],
2365 Value::String("CategoryA".to_string().into())
2366 );
2367 assert_eq!(
2368 nodes[2].fields[1],
2369 Value::String("CategoryA".to_string().into())
2370 );
2371 }
2372
2373 #[test]
2374 fn test_matrix_row_alias_substitution() {
2375 let input = r#"
2376%VERSION: 1.0
2377%ALIAS status = "Active"
2378%STRUCT: User: [id, status]
2379---
2380users: @User
2381 | alice, $status
2382"#;
2383 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2384 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2385 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2386
2387 assert_eq!(nodes.len(), 1);
2388 assert_eq!(
2389 nodes[0].fields[1],
2390 Value::String("Active".to_string().into())
2391 );
2392 }
2393
2394 #[test]
2397 fn test_inline_schema() {
2398 let input = r"
2399%VERSION: 1.0
2400---
2401items: @Item[id, name]
2402 | item1, First
2403 | item2, Second
2404";
2405 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2406 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2407 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2408
2409 assert_eq!(nodes.len(), 2);
2410 assert_eq!(nodes[0].type_name, "Item");
2411 }
2412
2413 #[test]
2414 fn test_inline_schema_mismatch_declared_produces_error() {
2415 let input = r"
2416%VERSION: 1.0
2417%STRUCT: Item: [id, name, extra]
2418---
2419items: @Item[id, name]
2420 | item1, First
2421";
2422 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2423 let events: Vec<_> = parser.collect::<Vec<_>>();
2424
2425 let has_schema_error = events.iter().any(|e| {
2427 if let Err(e) = e {
2428 matches!(e, StreamError::Schema { .. })
2429 } else {
2430 false
2431 }
2432 });
2433 assert!(has_schema_error, "expected schema mismatch error");
2434 }
2435
2436 #[test]
2437 fn test_inline_schema_matches_declared_works() {
2438 let input = r"
2439%VERSION: 1.0
2440%STRUCT: Item: [id, name]
2441---
2442items: @Item[id, name]
2443 | item1, First
2444";
2445 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2446 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2447 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2448
2449 assert_eq!(nodes.len(), 1);
2451 assert_eq!(nodes[0].fields.len(), 2);
2452 }
2453
2454 #[test]
2455 fn test_inline_schema_invalid_column_name() {
2456 let input = r"
2457%VERSION: 1.0
2458---
2459items: @Item[id, Invalid-Name, value]
2460 | item1, x, y
2461";
2462 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2463 let events: Vec<_> = parser.collect::<Vec<_>>();
2464
2465 let has_syntax_error = events.iter().any(|e| {
2467 if let Err(e) = e {
2468 matches!(e, StreamError::Syntax { .. })
2469 && format!("{e}").contains("invalid column name")
2470 } else {
2471 false
2472 }
2473 });
2474 assert!(has_syntax_error, "expected invalid column name error");
2475 }
2476
2477 #[test]
2478 fn test_inline_schema_empty() {
2479 let input = r"
2480%VERSION: 1.0
2481---
2482items: @Item[]
2483 | item1
2484";
2485 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2486 let events: Vec<_> = parser.collect::<Vec<_>>();
2487
2488 let has_syntax_error = events.iter().any(|e| {
2490 if let Err(e) = e {
2491 matches!(e, StreamError::Syntax { .. })
2492 && format!("{e}").contains("empty inline schema")
2493 } else {
2494 false
2495 }
2496 });
2497 assert!(has_syntax_error, "expected empty inline schema error");
2498 }
2499
2500 #[test]
2501 fn test_inline_schema_column_with_leading_digit() {
2502 let input = r"
2503%VERSION: 1.0
2504---
2505items: @Item[id, 123col]
2506 | item1, x
2507";
2508 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2509 let events: Vec<_> = parser.collect::<Vec<_>>();
2510
2511 let has_syntax_error = events.iter().any(|e| {
2513 if let Err(e) = e {
2514 matches!(e, StreamError::Syntax { .. })
2515 && format!("{e}").contains("invalid column name")
2516 } else {
2517 false
2518 }
2519 });
2520 assert!(
2521 has_syntax_error,
2522 "expected invalid column name error for leading digit"
2523 );
2524 }
2525
2526 #[test]
2529 fn test_object_context() {
2530 let input = r"
2531%VERSION: 1.0
2532%STRUCT: User: [id, name]
2533---
2534db:
2535 users: @User
2536 | alice, Alice
2537";
2538 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2539 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2540
2541 let obj_starts: Vec<_> = events
2542 .iter()
2543 .filter(|e| matches!(e, NodeEvent::ObjectStart { .. }))
2544 .collect();
2545 assert_eq!(obj_starts.len(), 1);
2546
2547 if let NodeEvent::ObjectStart { key, .. } = obj_starts[0] {
2548 assert_eq!(key, "db");
2549 }
2550 }
2551
2552 #[test]
2553 fn test_scalar_value() {
2554 let input = r#"
2555%VERSION: 1.0
2556---
2557config:
2558 timeout: 30
2559 enabled: true
2560 name: "Test Config"
2561"#;
2562 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2563 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2564
2565 let scalars: Vec<_> = events
2566 .iter()
2567 .filter(|e| matches!(e, NodeEvent::Scalar { .. }))
2568 .collect();
2569 assert_eq!(scalars.len(), 3);
2570 }
2571
2572 #[test]
2575 fn test_unicode_ids() {
2576 let input = r"
2577%VERSION: 1.0
2578%STRUCT: User: [id, name]
2579---
2580users: @User
2581 | 用户1, 张三
2582 | пользователь, Иван
2583";
2584 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2585 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2586 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2587
2588 assert_eq!(nodes.len(), 2);
2589 assert_eq!(nodes[0].id, "用户1");
2590 assert_eq!(nodes[1].id, "пользователь");
2591 }
2592
2593 #[test]
2594 fn test_unicode_in_values() {
2595 let input = r"
2596%VERSION: 1.0
2597%STRUCT: Data: [id, emoji]
2598---
2599data: @Data
2600 | row1, 🎉✨🚀
2601";
2602 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2603 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2604 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2605
2606 assert_eq!(nodes.len(), 1);
2607 assert_eq!(
2608 nodes[0].fields[1],
2609 Value::String("🎉✨🚀".to_string().into())
2610 );
2611 }
2612
2613 #[test]
2616 fn test_inline_comments() {
2617 let input = r"
2618%VERSION: 1.0
2619%STRUCT: User: [id, name]
2620---
2621users: @User # list of users
2622 | alice, Alice Smith # first user
2623 | bob, Bob Jones
2624";
2625 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2626 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2627 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2628
2629 assert_eq!(nodes.len(), 2);
2630 assert_eq!(nodes[0].id, "alice");
2632 }
2633
2634 #[test]
2635 fn test_full_line_comments() {
2636 let input = r"
2637%VERSION: 1.0
2638%STRUCT: User: [id, name]
2639---
2640# This is a comment
2641users: @User
2642 # Comment between rows
2643 | alice, Alice
2644 | bob, Bob
2645";
2646 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2647 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2648 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2649
2650 assert_eq!(nodes.len(), 2);
2651 }
2652
2653 #[test]
2654 fn test_hash_in_quoted_string() {
2655 let input =
2656 "%VERSION: 1.0\n%STRUCT: Data: [id, tag]\n---\ndata: @Data\n | row1, \"#hashtag\"\n";
2657 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2658 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2659 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2660
2661 assert_eq!(nodes.len(), 1);
2662 assert_eq!(
2663 nodes[0].fields[1],
2664 Value::String("#hashtag".to_string().into())
2665 );
2666 }
2667
2668 #[test]
2671 fn test_multiple_lists() {
2672 let input = r"
2673%VERSION: 1.0
2674%STRUCT: User: [id, name]
2675%STRUCT: Product: [id, title]
2676---
2677users: @User
2678 | alice, Alice
2679products: @Product
2680 | prod1, Widget
2681";
2682 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2683 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2684 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2685
2686 assert_eq!(nodes.len(), 2);
2687 assert_eq!(nodes[0].type_name, "User");
2688 assert_eq!(nodes[1].type_name, "Product");
2689 }
2690
2691 #[test]
2692 fn test_excessive_indent_error() {
2693 let config = StreamingParserConfig {
2694 max_indent_depth: 2,
2695 ..Default::default()
2696 };
2697 let input = r"
2698%VERSION: 1.0
2699%STRUCT: Data: [id]
2700---
2701level1:
2702 level2:
2703 level3:
2704 data: @Data
2705 | row1
2706";
2707 let parser = StreamingParser::with_config(Cursor::new(input), config).unwrap();
2708
2709 let mut found_indent_error = false;
2711 for result in parser {
2712 if let Err(StreamError::Syntax { message, .. }) = result {
2713 if message.contains("indent depth") {
2714 found_indent_error = true;
2715 break;
2716 }
2717 }
2718 }
2719 assert!(found_indent_error);
2720 }
2721
2722 #[test]
2725 fn test_undefined_schema() {
2726 let input = r"
2727%VERSION: 1.0
2728---
2729users: @User
2730 | alice, Alice
2731";
2732 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2733 let events: Vec<_> = parser.collect();
2734 let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2735
2736 assert!(!errors.is_empty());
2738 }
2739
2740 #[test]
2741 fn test_orphan_row_without_context() {
2742 let input = r"
2743%VERSION: 1.0
2744%STRUCT: Data: [id]
2745---
2746| orphan_row
2747";
2748 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2749 let events: Vec<_> = parser.collect();
2750 let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2751
2752 assert!(!errors.is_empty());
2753 }
2754
2755 #[test]
2756 fn test_missing_colon_error() {
2757 let input = r"
2758%VERSION: 1.0
2759---
2760invalid line without colon
2761";
2762 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2763 let events: Vec<_> = parser.collect();
2764 let errors: Vec<_> = events.iter().filter(|e| e.is_err()).collect();
2765
2766 assert!(!errors.is_empty());
2767 if let Err(StreamError::Syntax { message, .. }) = &errors[0] {
2768 assert!(message.contains(':'));
2769 }
2770 }
2771
2772 #[test]
2775 fn test_many_rows() {
2776 let mut input = String::from(
2777 r"
2778%VERSION: 1.0
2779%STRUCT: Data: [id, value]
2780---
2781data: @Data
2782",
2783 );
2784 for i in 0..1000 {
2785 input.push_str(&format!(" | row{i}, value{i}\n"));
2786 }
2787
2788 let parser = StreamingParser::new(Cursor::new(input)).unwrap();
2789 let events: Vec<_> = parser.filter_map(std::result::Result::ok).collect();
2790 let nodes: Vec<_> = events.iter().filter_map(|e| e.as_node()).collect();
2791
2792 assert_eq!(nodes.len(), 1000);
2793 }
2794
2795 #[test]
2798 fn test_strip_comment_basic() {
2799 assert_eq!(strip_comment("hello # comment"), "hello");
2800 }
2801
2802 #[test]
2803 fn test_strip_comment_quoted() {
2804 assert_eq!(
2805 strip_comment(r#""hello # not comment""#),
2806 r#""hello # not comment""#
2807 );
2808 }
2809
2810 #[test]
2811 fn test_strip_comment_escaped() {
2812 assert_eq!(
2814 strip_comment(r"hello\# not a comment"),
2815 r"hello\# not a comment"
2816 );
2817 assert_eq!(
2819 strip_comment(r"hello\# still here # comment"),
2820 r"hello\# still here"
2821 );
2822 }
2823
2824 #[test]
2825 fn test_strip_comment_escaped_in_quotes() {
2826 assert_eq!(
2828 strip_comment(r#""hello\#world" more"#),
2829 r#""hello\#world" more"#
2830 );
2831 }
2832
2833 #[test]
2834 fn test_strip_comment_no_comment() {
2835 assert_eq!(strip_comment("hello world"), "hello world");
2836 }
2837}