1use crate::document::{Item, MatrixList, Node};
46use crate::error::{HedlError, HedlResult};
47use crate::header::Header;
48use crate::inference::{infer_quoted_value, infer_value, InferenceContext};
49use crate::lex::row::parse_csv_row;
50use crate::lex::{calculate_indent, strip_comment};
51use crate::limits::Limits;
52use crate::reference::TypeRegistry;
53use crate::value::Value;
54use rayon::prelude::*;
55use std::collections::BTreeMap;
56use std::ops::Range;
57use std::sync::atomic::{AtomicUsize, Ordering};
58
59#[derive(Debug, Clone)]
76pub struct ParallelConfig {
77 pub enabled: bool,
79 pub min_root_entities: usize,
81 pub min_list_rows: usize,
83 pub thread_pool_size: Option<usize>,
85}
86
87impl Default for ParallelConfig {
88 fn default() -> Self {
89 Self {
90 enabled: true,
91 min_root_entities: 50,
92 min_list_rows: 100,
93 thread_pool_size: None,
94 }
95 }
96}
97
98impl ParallelConfig {
99 pub fn conservative() -> Self {
103 Self {
104 enabled: true,
105 min_root_entities: 100,
106 min_list_rows: 200,
107 thread_pool_size: None,
108 }
109 }
110
111 pub fn aggressive() -> Self {
115 Self {
116 enabled: true,
117 min_root_entities: 20,
118 min_list_rows: 50,
119 thread_pool_size: None,
120 }
121 }
122
123 pub fn should_parallelize_entities(&self, entity_count: usize) -> bool {
125 self.enabled && entity_count >= self.min_root_entities
126 }
127
128 pub fn should_parallelize_rows(&self, row_count: usize) -> bool {
130 self.enabled && row_count >= self.min_list_rows
131 }
132}
133
134pub struct AtomicSecurityCounters {
139 node_count: AtomicUsize,
140 total_keys: AtomicUsize,
141}
142
143impl AtomicSecurityCounters {
144 pub fn new() -> Self {
146 Self {
147 node_count: AtomicUsize::new(0),
148 total_keys: AtomicUsize::new(0),
149 }
150 }
151
152 pub fn increment_nodes(&self, limits: &Limits, line_num: usize) -> HedlResult<()> {
156 let count = self.node_count.fetch_add(1, Ordering::Relaxed);
157 if count >= limits.max_nodes {
158 return Err(HedlError::security(
159 format!("too many nodes: exceeds limit of {}", limits.max_nodes),
160 line_num,
161 ));
162 }
163 Ok(())
164 }
165
166 pub fn increment_keys(&self, limits: &Limits, line_num: usize) -> HedlResult<()> {
168 let count = self.total_keys.fetch_add(1, Ordering::Relaxed);
169 if count >= limits.max_total_keys {
170 return Err(HedlError::security(
171 format!(
172 "too many total keys: {} exceeds limit {}",
173 count + 1,
174 limits.max_total_keys
175 ),
176 line_num,
177 ));
178 }
179 Ok(())
180 }
181
182 pub fn node_count(&self) -> usize {
184 self.node_count.load(Ordering::Relaxed)
185 }
186
187 pub fn key_count(&self) -> usize {
189 self.total_keys.load(Ordering::Relaxed)
190 }
191}
192
193impl Default for AtomicSecurityCounters {
194 fn default() -> Self {
195 Self::new()
196 }
197}
198
199#[derive(Debug, Clone)]
204pub struct EntityBoundary {
205 pub key: String,
207 pub line_range: Range<usize>,
209 pub entity_type: EntityType,
211}
212
213#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub enum EntityType {
216 Object,
218 List,
220 Scalar,
222}
223
224#[derive(Debug)]
228pub struct MatrixRowBatch<'a> {
229 pub type_name: String,
231 pub schema: Vec<String>,
233 pub rows: Vec<(usize, &'a str)>,
235 pub has_ditto: bool,
237}
238
239impl<'a> MatrixRowBatch<'a> {
240 pub fn can_parallelize(&self) -> bool {
244 !self.has_ditto && !self.rows.is_empty()
245 }
246}
247
248pub fn identify_entity_boundaries(lines: &[(usize, &str)]) -> Vec<EntityBoundary> {
253 let mut boundaries = Vec::new();
254 let mut current_key: Option<String> = None;
255 let mut current_start: usize = 0;
256 let mut current_type = EntityType::Object;
257
258 for (idx, &(line_num, line)) in lines.iter().enumerate() {
259 let trimmed = line.trim();
261 if trimmed.is_empty() || trimmed.starts_with('#') {
262 continue;
263 }
264
265 let indent_info = calculate_indent(line, line_num as u32).ok().flatten();
267 let indent = indent_info.map(|i| i.level).unwrap_or(0);
268
269 if indent == 0 && line.contains(':') {
270 if let Some(key) = current_key.take() {
272 boundaries.push(EntityBoundary {
274 key,
275 line_range: current_start..idx,
276 entity_type: current_type,
277 });
278 }
279
280 if let Some(colon_pos) = line.find(':') {
282 let key_part = &line[..colon_pos].trim();
283 let key = if let Some(paren_pos) = key_part.find('(') {
285 key_part[..paren_pos].trim()
286 } else {
287 key_part
288 };
289
290 current_key = Some(key.to_string());
291 current_start = idx;
292
293 let after_colon = line[colon_pos + 1..].trim();
295 current_type = if after_colon.starts_with('@') {
296 EntityType::List
297 } else if after_colon.is_empty() {
298 EntityType::Object
299 } else {
300 EntityType::Scalar
301 };
302 }
303 }
304 }
305
306 if let Some(key) = current_key {
308 boundaries.push(EntityBoundary {
309 key,
310 line_range: current_start..lines.len(),
311 entity_type: current_type,
312 });
313 }
314
315 boundaries
316}
317
318pub fn collect_matrix_rows<'a>(
323 lines: &'a [(usize, &str)],
324 list_start: usize,
325 expected_indent: usize,
326) -> MatrixRowBatch<'a> {
327 let mut rows = Vec::new();
328 let mut has_ditto = false;
329 let mut type_name = String::new();
330 let mut schema = Vec::new();
331
332 if list_start < lines.len() {
334 let (_, decl_line) = lines[list_start];
335 if let Some(colon_pos) = decl_line.find(':') {
336 let after_colon = decl_line[colon_pos + 1..].trim();
337 if let Some(rest) = after_colon.strip_prefix('@') {
338 if let Some(bracket_pos) = rest.find('[') {
340 type_name = rest[..bracket_pos].to_string();
341 let schema_str = &rest[bracket_pos..];
342 if schema_str.starts_with('[') && schema_str.ends_with(']') {
343 let inner = &schema_str[1..schema_str.len() - 1];
344 schema = inner.split(',').map(|s| s.trim().to_string()).collect();
345 }
346 } else {
347 type_name = rest.trim().to_string();
348 }
349 }
350 }
351 }
352
353 let mut i = list_start + 1;
355 while i < lines.len() {
356 let (line_num, line) = lines[i];
357
358 let indent_info = calculate_indent(line, line_num as u32).ok().flatten();
360 let indent = indent_info.map(|info| info.level).unwrap_or(0);
361
362 if indent < expected_indent {
364 break;
365 }
366
367 let content = line.trim();
368
369 if content.starts_with('|') {
371 if content.contains('"') && !content.contains("\"\"") {
373 let unquoted_part: String = content
375 .chars()
376 .scan(false, |in_quote, c| {
377 if c == '"' {
378 *in_quote = !*in_quote;
379 }
380 Some(if *in_quote { ' ' } else { c })
381 })
382 .collect();
383 if unquoted_part.contains('"') {
384 has_ditto = true;
385 }
386 }
387
388 rows.push((line_num, content));
389 } else if !content.is_empty() && !content.starts_with('#') {
390 break;
392 }
393
394 i += 1;
395 }
396
397 MatrixRowBatch {
398 type_name,
399 schema,
400 rows,
401 has_ditto,
402 }
403}
404
405pub fn parse_matrix_rows_parallel(
410 batch: &MatrixRowBatch<'_>,
411 header: &Header,
412 limits: &Limits,
413 counters: &AtomicSecurityCounters,
414) -> HedlResult<Vec<Node>> {
415 if !batch.can_parallelize() {
416 return parse_matrix_rows_sequential(batch, header, limits);
418 }
419
420 let nodes: Vec<HedlResult<Node>> = batch
422 .rows
423 .par_iter()
424 .map(|(line_num, row_content)| {
425 counters.increment_nodes(limits, *line_num)?;
427
428 parse_single_matrix_row(
430 row_content,
431 &batch.schema,
432 &batch.type_name,
433 header,
434 *line_num,
435 )
436 })
437 .collect();
438
439 nodes.into_iter().collect()
441}
442
443fn parse_matrix_rows_sequential(
445 batch: &MatrixRowBatch<'_>,
446 header: &Header,
447 limits: &Limits,
448) -> HedlResult<Vec<Node>> {
449 let mut nodes = Vec::with_capacity(batch.rows.len());
450 let mut prev_values: Option<Vec<Value>> = None;
451 let mut node_count = 0usize;
452
453 for (line_num, row_content) in &batch.rows {
454 node_count = node_count
456 .checked_add(1)
457 .ok_or_else(|| HedlError::security("node count overflow", *line_num))?;
458 if node_count > limits.max_nodes {
459 return Err(HedlError::security(
460 format!("too many nodes: exceeds limit of {}", limits.max_nodes),
461 *line_num,
462 ));
463 }
464
465 let node = parse_single_matrix_row_with_ditto(
467 row_content,
468 &batch.schema,
469 &batch.type_name,
470 header,
471 *line_num,
472 prev_values.as_deref(),
473 )?;
474
475 prev_values = Some(node.fields.to_vec());
476 nodes.push(node);
477 }
478
479 Ok(nodes)
480}
481
482fn parse_single_matrix_row(
486 row_content: &str,
487 schema: &[String],
488 type_name: &str,
489 header: &Header,
490 line_num: usize,
491) -> HedlResult<Node> {
492 parse_single_matrix_row_with_ditto(row_content, schema, type_name, header, line_num, None)
493}
494
495fn parse_single_matrix_row_with_ditto(
497 row_content: &str,
498 schema: &[String],
499 type_name: &str,
500 header: &Header,
501 line_num: usize,
502 prev_values: Option<&[Value]>,
503) -> HedlResult<Node> {
504 let content = row_content.strip_prefix('|').unwrap_or(row_content);
506
507 let (child_count, csv_content) = if content.starts_with('[') {
509 if let Some(bracket_end) = content.find(']') {
510 let count_str = &content[1..bracket_end];
511 if let Ok(count) = count_str.parse::<usize>() {
512 let data = content[bracket_end + 1..].trim_start();
513 (Some(count), data)
514 } else {
515 (None, content)
516 }
517 } else {
518 (None, content)
519 }
520 } else {
521 (None, content)
522 };
523
524 let csv_content = strip_comment(csv_content).trim();
525
526 let fields =
528 parse_csv_row(csv_content).map_err(|e| HedlError::syntax(e.to_string(), line_num))?;
529
530 if fields.len() != schema.len() {
532 return Err(HedlError::shape(
533 format!("expected {} columns, got {}", schema.len(), fields.len()),
534 line_num,
535 ));
536 }
537
538 let mut values = Vec::with_capacity(fields.len());
540 for (col_idx, field) in fields.iter().enumerate() {
541 let ctx =
542 InferenceContext::for_matrix_cell(&header.aliases, col_idx, prev_values, type_name);
543
544 let value = if field.is_quoted {
545 infer_quoted_value(&field.value)
546 } else {
547 infer_value(&field.value, &ctx, line_num)?
548 };
549
550 values.push(value);
551 }
552
553 let id = match &values[0] {
555 Value::String(s) => s.clone(),
556 _ => {
557 return Err(HedlError::semantic("ID column must be a string", line_num));
558 }
559 };
560
561 let mut node = Node::new(type_name, &*id, values);
563 if let Some(count) = child_count {
564 node.set_child_count(count);
565 }
566
567 Ok(node)
568}
569
570pub fn collect_ids_parallel(
575 items: &BTreeMap<String, Item>,
576 limits: &Limits,
577) -> HedlResult<TypeRegistry> {
578 if items.len() < 10 {
580 let mut registry = TypeRegistry::new();
581 collect_ids_sequential(items, &mut registry, 0, limits.max_nest_depth, limits)?;
582 return Ok(registry);
583 }
584
585 let local_registries: Vec<HedlResult<TypeRegistry>> = items
587 .par_iter()
588 .map(|(_, item)| {
589 let mut local = TypeRegistry::new();
590 collect_item_ids(item, &mut local, 0, limits.max_nest_depth, limits)?;
591 Ok(local)
592 })
593 .collect();
594
595 let mut merged = TypeRegistry::new();
597 for result in local_registries {
598 let local = result?;
599 merge_registry_into(&mut merged, local, limits)?;
600 }
601
602 Ok(merged)
603}
604
605fn collect_ids_sequential(
607 items: &BTreeMap<String, Item>,
608 registry: &mut TypeRegistry,
609 depth: usize,
610 max_depth: usize,
611 limits: &Limits,
612) -> HedlResult<()> {
613 if depth > max_depth {
614 return Err(HedlError::security(
615 format!(
616 "NEST hierarchy depth {} exceeds maximum allowed depth {}",
617 depth, max_depth
618 ),
619 0,
620 ));
621 }
622
623 for item in items.values() {
624 collect_item_ids(item, registry, depth, max_depth, limits)?;
625 }
626
627 Ok(())
628}
629
630fn collect_item_ids(
632 item: &Item,
633 registry: &mut TypeRegistry,
634 depth: usize,
635 max_depth: usize,
636 limits: &Limits,
637) -> HedlResult<()> {
638 match item {
639 Item::List(list) => {
640 collect_list_ids(list, registry, depth, max_depth, limits)?;
641 }
642 Item::Object(obj) => {
643 collect_ids_sequential(obj, registry, depth + 1, max_depth, limits)?;
644 }
645 Item::Scalar(_) => {}
646 }
647 Ok(())
648}
649
650fn collect_list_ids(
652 list: &MatrixList,
653 registry: &mut TypeRegistry,
654 depth: usize,
655 max_depth: usize,
656 limits: &Limits,
657) -> HedlResult<()> {
658 for node in &list.rows {
659 registry.register(&list.type_name, &node.id, 0, limits)?;
660 }
661
662 for node in &list.rows {
664 if let Some(children) = node.children() {
665 for child_list in children.values() {
666 for child in child_list {
667 collect_node_ids(child, registry, depth + 1, max_depth, limits)?;
668 }
669 }
670 }
671 }
672
673 Ok(())
674}
675
676fn collect_node_ids(
678 node: &Node,
679 registry: &mut TypeRegistry,
680 depth: usize,
681 max_depth: usize,
682 limits: &Limits,
683) -> HedlResult<()> {
684 if depth > max_depth {
685 return Err(HedlError::security(
686 format!(
687 "NEST hierarchy depth {} exceeds maximum allowed depth {}",
688 depth, max_depth
689 ),
690 0,
691 ));
692 }
693
694 registry.register(&node.type_name, &node.id, 0, limits)?;
695
696 if let Some(children) = node.children() {
697 for child_list in children.values() {
698 for child in child_list {
699 collect_node_ids(child, registry, depth + 1, max_depth, limits)?;
700 }
701 }
702 }
703
704 Ok(())
705}
706
707fn merge_registry_into(
709 target: &mut TypeRegistry,
710 source: TypeRegistry,
711 limits: &Limits,
712) -> HedlResult<()> {
713 for (id, types) in source.by_id_iter() {
715 for type_name in types {
716 if target.contains_in_type(type_name, id) {
718 return Err(HedlError::collision(
719 format!(
720 "duplicate ID '{}' in type '{}' detected during parallel merge",
721 id, type_name
722 ),
723 0,
724 ));
725 }
726 target.register(type_name, id, 0, limits)?;
728 }
729 }
730 Ok(())
731}
732
733pub fn validate_references_parallel(
738 items: &BTreeMap<String, Item>,
739 registries: &TypeRegistry,
740 strict: bool,
741 max_depth: usize,
742) -> HedlResult<()> {
743 if items.len() < 10 {
745 return validate_references_sequential(items, registries, strict, None, 0, max_depth);
746 }
747
748 let results: Vec<HedlResult<()>> = items
750 .par_iter()
751 .map(|(_, item)| validate_item_refs(item, registries, strict, None, 0, max_depth))
752 .collect();
753
754 for result in results {
756 result?;
757 }
758
759 Ok(())
760}
761
762fn validate_references_sequential(
764 items: &BTreeMap<String, Item>,
765 registries: &TypeRegistry,
766 strict: bool,
767 current_type: Option<&str>,
768 depth: usize,
769 max_depth: usize,
770) -> HedlResult<()> {
771 if depth > max_depth {
772 return Err(HedlError::security(
773 format!(
774 "NEST hierarchy depth {} exceeds maximum allowed depth {}",
775 depth, max_depth
776 ),
777 0,
778 ));
779 }
780
781 for item in items.values() {
782 validate_item_refs(item, registries, strict, current_type, depth, max_depth)?;
783 }
784
785 Ok(())
786}
787
788fn validate_item_refs(
790 item: &Item,
791 registries: &TypeRegistry,
792 strict: bool,
793 current_type: Option<&str>,
794 depth: usize,
795 max_depth: usize,
796) -> HedlResult<()> {
797 match item {
798 Item::Scalar(value) => {
799 validate_value_ref(value, registries, strict, current_type)?;
800 }
801 Item::List(list) => {
802 for node in &list.rows {
803 validate_node_refs(node, registries, strict, depth, max_depth)?;
804 }
805 }
806 Item::Object(obj) => {
807 validate_references_sequential(
808 obj,
809 registries,
810 strict,
811 current_type,
812 depth + 1,
813 max_depth,
814 )?;
815 }
816 }
817 Ok(())
818}
819
820fn validate_node_refs(
822 node: &Node,
823 registries: &TypeRegistry,
824 strict: bool,
825 depth: usize,
826 max_depth: usize,
827) -> HedlResult<()> {
828 if depth > max_depth {
829 return Err(HedlError::security(
830 format!(
831 "NEST hierarchy depth {} exceeds maximum allowed depth {}",
832 depth, max_depth
833 ),
834 0,
835 ));
836 }
837
838 for value in &node.fields {
839 validate_value_ref(value, registries, strict, Some(&node.type_name))?;
840 }
841
842 if let Some(children) = node.children() {
843 for child_list in children.values() {
844 for child in child_list {
845 validate_node_refs(child, registries, strict, depth + 1, max_depth)?;
846 }
847 }
848 }
849
850 Ok(())
851}
852
853fn validate_value_ref(
855 value: &Value,
856 registries: &TypeRegistry,
857 strict: bool,
858 current_type: Option<&str>,
859) -> HedlResult<()> {
860 if let Value::Reference(ref_val) = value {
861 let resolved = match &ref_val.type_name {
862 Some(t) => registries.contains_in_type(t, &ref_val.id),
863 None => match current_type {
864 Some(type_name) => registries.contains_in_type(type_name, &ref_val.id),
865 None => {
866 let matching_types = registries.lookup_unqualified(&ref_val.id).unwrap_or(&[]);
867 match matching_types.len() {
868 0 => false,
869 1 => true,
870 _ => {
871 return Err(HedlError::reference(
872 format!(
873 "Ambiguous unqualified reference '@{}' matches multiple types: [{}]",
874 ref_val.id,
875 matching_types.join(", ")
876 ),
877 0,
878 ));
879 }
880 }
881 }
882 },
883 };
884
885 if !resolved && strict {
886 return Err(HedlError::reference(
887 format!("unresolved reference {}", ref_val.to_ref_string()),
888 0,
889 ));
890 }
891 }
892
893 Ok(())
894}
895
896#[cfg(test)]
897mod tests {
898 use super::*;
899
900 #[test]
901 fn test_parallel_config_default() {
902 let config = ParallelConfig::default();
903 assert!(config.enabled);
904 assert_eq!(config.min_root_entities, 50);
905 assert_eq!(config.min_list_rows, 100);
906 assert!(config.thread_pool_size.is_none());
907 }
908
909 #[test]
910 fn test_parallel_config_conservative() {
911 let config = ParallelConfig::conservative();
912 assert_eq!(config.min_root_entities, 100);
913 assert_eq!(config.min_list_rows, 200);
914 }
915
916 #[test]
917 fn test_parallel_config_aggressive() {
918 let config = ParallelConfig::aggressive();
919 assert_eq!(config.min_root_entities, 20);
920 assert_eq!(config.min_list_rows, 50);
921 }
922
923 #[test]
924 fn test_parallel_config_thresholds() {
925 let config = ParallelConfig::default();
926
927 assert!(!config.should_parallelize_entities(10));
928 assert!(!config.should_parallelize_entities(49));
929 assert!(config.should_parallelize_entities(50));
930 assert!(config.should_parallelize_entities(100));
931
932 assert!(!config.should_parallelize_rows(10));
933 assert!(!config.should_parallelize_rows(99));
934 assert!(config.should_parallelize_rows(100));
935 assert!(config.should_parallelize_rows(1000));
936 }
937
938 #[test]
939 fn test_atomic_counters() {
940 let counters = AtomicSecurityCounters::new();
941 assert_eq!(counters.node_count(), 0);
942 assert_eq!(counters.key_count(), 0);
943
944 let limits = Limits::default();
945
946 counters.increment_nodes(&limits, 0).unwrap();
948 assert_eq!(counters.node_count(), 1);
949
950 counters.increment_keys(&limits, 0).unwrap();
951 assert_eq!(counters.key_count(), 1);
952 }
953
954 #[test]
955 fn test_atomic_counters_limit_exceeded() {
956 let counters = AtomicSecurityCounters::new();
957 let limits = Limits {
958 max_nodes: 1,
959 ..Default::default()
960 };
961
962 counters.increment_nodes(&limits, 0).unwrap();
964
965 let result = counters.increment_nodes(&limits, 0);
967 assert!(result.is_err());
968 }
969
970 #[test]
971 fn test_entity_boundary_identification() {
972 let lines: Vec<(usize, &str)> = vec![
973 (1, "users: @User"),
974 (2, "| alice"),
975 (3, "| bob"),
976 (4, "settings:"),
977 (5, " debug: true"),
978 (6, "count: 42"),
979 ];
980
981 let boundaries = identify_entity_boundaries(&lines);
982 assert_eq!(boundaries.len(), 3);
983
984 assert_eq!(boundaries[0].key, "users");
985 assert_eq!(boundaries[0].entity_type, EntityType::List);
986 assert_eq!(boundaries[0].line_range, 0..3);
987
988 assert_eq!(boundaries[1].key, "settings");
989 assert_eq!(boundaries[1].entity_type, EntityType::Object);
990 assert_eq!(boundaries[1].line_range, 3..5);
991
992 assert_eq!(boundaries[2].key, "count");
993 assert_eq!(boundaries[2].entity_type, EntityType::Scalar);
994 assert_eq!(boundaries[2].line_range, 5..6);
995 }
996
997 #[test]
998 fn test_matrix_row_batch_no_ditto() {
999 let batch = MatrixRowBatch {
1000 type_name: "User".to_string(),
1001 schema: vec!["id".to_string(), "name".to_string()],
1002 rows: vec![(1, "|alice, Alice"), (2, "|bob, Bob")],
1003 has_ditto: false,
1004 };
1005
1006 assert!(batch.can_parallelize());
1007 }
1008
1009 #[test]
1010 fn test_matrix_row_batch_with_ditto() {
1011 let batch = MatrixRowBatch {
1012 type_name: "User".to_string(),
1013 schema: vec!["id".to_string(), "name".to_string()],
1014 rows: vec![(1, "|alice, Alice"), (2, "|bob, \"")],
1015 has_ditto: true,
1016 };
1017
1018 assert!(!batch.can_parallelize());
1019 }
1020
1021 #[test]
1022 fn test_parse_single_matrix_row() {
1023 let header = Header::new((1, 0));
1024 let schema = vec!["id".to_string(), "name".to_string()];
1025
1026 let node = parse_single_matrix_row("|alice, Alice", &schema, "User", &header, 1).unwrap();
1027
1028 assert_eq!(node.id, "alice");
1029 assert_eq!(node.type_name, "User");
1030 assert_eq!(node.fields.len(), 2);
1031 }
1032
1033 #[test]
1034 fn test_parse_single_matrix_row_with_child_count() {
1035 let header = Header::new((1, 0));
1036 let schema = vec!["id".to_string(), "name".to_string()];
1037
1038 let node =
1039 parse_single_matrix_row("|[3] alice, Alice", &schema, "User", &header, 1).unwrap();
1040
1041 assert_eq!(node.id, "alice");
1042 assert_eq!(node.child_count, 3);
1043 }
1044}