hedl_core/
parallel.rs

1// Dweve HEDL - Hierarchical Entity Data Language
2//
3// Copyright (c) 2025 Dweve IP B.V. and individual contributors.
4//
5// SPDX-License-Identifier: Apache-2.0
6//
7// Licensed under the Apache License, Version 2.0 (the "License");
8// you may not use this file except in compliance with the License.
9// You may obtain a copy of the License in the LICENSE file at the
10// root of this repository or at: http://www.apache.org/licenses/LICENSE-2.0
11//
12// Unless required by applicable law or agreed to in writing, software
13// distributed under the License is distributed on an "AS IS" BASIS,
14// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15// See the License for the specific language governing permissions and
16// limitations under the License.
17
18//! Parallel parsing infrastructure for hedl-core.
19//!
20//! This module provides data parallelism using rayon's work-stealing scheduler
21//! for improved throughput on multi-core systems. Parallel parsing achieves
22//! 2-4x speedup for documents with 1000+ entities while maintaining deterministic
23//! output and semantic equivalence with the sequential parser.
24//!
25//! # Features
26//!
27//! - **Parallel entity parsing**: Top-level entities parsed in parallel
28//! - **Parallel matrix row parsing**: Large matrix lists (100+ rows) parsed in parallel
29//! - **Parallel reference resolution**: ID collection and validation parallelized
30//! - **Adaptive thresholds**: Automatic fallback to sequential for small documents
31//!
32//! # Examples
33//!
34//! ```text
35//! use hedl_core::{parse_with_limits, ParseOptions, ParallelConfig};
36//!
37//! let opts = ParseOptions::builder()
38//!     .enable_parallel(true)
39//!     .parallel_thresholds(50, 100)  // Min entities, min rows
40//!     .build();
41//!
42//! let doc = parse_with_limits(input, opts)?;
43//! ```
44
45use crate::document::{Item, MatrixList, Node};
46use crate::error::{HedlError, HedlResult};
47use crate::header::Header;
48use crate::inference::{infer_quoted_value, infer_value, InferenceContext};
49use crate::lex::row::parse_csv_row;
50use crate::lex::{calculate_indent, strip_comment};
51use crate::limits::Limits;
52use crate::reference::TypeRegistry;
53use crate::value::Value;
54use rayon::prelude::*;
55use std::collections::BTreeMap;
56use std::ops::Range;
57use std::sync::atomic::{AtomicUsize, Ordering};
58
59/// Configuration for parallel parsing behavior.
60///
61/// Controls when parallel parsing is activated and how many threads
62/// are used. Adaptive thresholds prevent parallel overhead from
63/// slowing down small documents.
64///
65/// # Examples
66///
67/// ```text
68/// let config = ParallelConfig {
69///     enabled: true,
70///     min_root_entities: 50,
71///     min_list_rows: 100,
72///     thread_pool_size: Some(4),
73/// };
74/// ```
75#[derive(Debug, Clone)]
76pub struct ParallelConfig {
77    /// Enable parallel parsing (default: true if feature enabled)
78    pub enabled: bool,
79    /// Minimum root entities to trigger parallelism (default: 50)
80    pub min_root_entities: usize,
81    /// Minimum matrix rows to parse in parallel (default: 100)
82    pub min_list_rows: usize,
83    /// Thread pool size (None = auto-detect cores)
84    pub thread_pool_size: Option<usize>,
85}
86
87impl Default for ParallelConfig {
88    fn default() -> Self {
89        Self {
90            enabled: true,
91            min_root_entities: 50,
92            min_list_rows: 100,
93            thread_pool_size: None,
94        }
95    }
96}
97
98impl ParallelConfig {
99    /// Create configuration for conservative parallel parsing.
100    ///
101    /// Uses higher thresholds to minimize overhead for typical documents.
102    pub fn conservative() -> Self {
103        Self {
104            enabled: true,
105            min_root_entities: 100,
106            min_list_rows: 200,
107            thread_pool_size: None,
108        }
109    }
110
111    /// Create configuration for aggressive parallel parsing.
112    ///
113    /// Uses lower thresholds to maximize parallelism opportunities.
114    pub fn aggressive() -> Self {
115        Self {
116            enabled: true,
117            min_root_entities: 20,
118            min_list_rows: 50,
119            thread_pool_size: None,
120        }
121    }
122
123    /// Check if parallel entity parsing should be used.
124    pub fn should_parallelize_entities(&self, entity_count: usize) -> bool {
125        self.enabled && entity_count >= self.min_root_entities
126    }
127
128    /// Check if parallel row parsing should be used.
129    pub fn should_parallelize_rows(&self, row_count: usize) -> bool {
130        self.enabled && row_count >= self.min_list_rows
131    }
132}
133
134/// Thread-safe security counters using atomic operations.
135///
136/// Tracks node counts and key counts across parallel parsing threads
137/// without lock contention.
138pub struct AtomicSecurityCounters {
139    node_count: AtomicUsize,
140    total_keys: AtomicUsize,
141}
142
143impl AtomicSecurityCounters {
144    /// Create new atomic counters starting at zero.
145    pub fn new() -> Self {
146        Self {
147            node_count: AtomicUsize::new(0),
148            total_keys: AtomicUsize::new(0),
149        }
150    }
151
152    /// Increment node count with security check.
153    ///
154    /// Returns error if limit is exceeded.
155    pub fn increment_nodes(&self, limits: &Limits, line_num: usize) -> HedlResult<()> {
156        let count = self.node_count.fetch_add(1, Ordering::Relaxed);
157        if count >= limits.max_nodes {
158            return Err(HedlError::security(
159                format!("too many nodes: exceeds limit of {}", limits.max_nodes),
160                line_num,
161            ));
162        }
163        Ok(())
164    }
165
166    /// Increment total keys count with security check.
167    pub fn increment_keys(&self, limits: &Limits, line_num: usize) -> HedlResult<()> {
168        let count = self.total_keys.fetch_add(1, Ordering::Relaxed);
169        if count >= limits.max_total_keys {
170            return Err(HedlError::security(
171                format!(
172                    "too many total keys: {} exceeds limit {}",
173                    count + 1,
174                    limits.max_total_keys
175                ),
176                line_num,
177            ));
178        }
179        Ok(())
180    }
181
182    /// Get current node count.
183    pub fn node_count(&self) -> usize {
184        self.node_count.load(Ordering::Relaxed)
185    }
186
187    /// Get current key count.
188    pub fn key_count(&self) -> usize {
189        self.total_keys.load(Ordering::Relaxed)
190    }
191}
192
193impl Default for AtomicSecurityCounters {
194    fn default() -> Self {
195        Self::new()
196    }
197}
198
199/// Entity boundary for parallel parsing.
200///
201/// Represents a contiguous range of lines belonging to a single
202/// top-level entity (object or list).
203#[derive(Debug, Clone)]
204pub struct EntityBoundary {
205    /// Key name for this entity
206    pub key: String,
207    /// Line range (start..end) in the document
208    pub line_range: Range<usize>,
209    /// Type of entity (Object, List, or Scalar)
210    pub entity_type: EntityType,
211}
212
213/// Type of entity for parallel parsing.
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215pub enum EntityType {
216    /// Object with nested key-value pairs
217    Object,
218    /// Matrix list with rows
219    List,
220    /// Single scalar value
221    Scalar,
222}
223
224/// Batch of matrix rows for parallel parsing.
225///
226/// Contains all information needed to parse rows independently.
227#[derive(Debug)]
228pub struct MatrixRowBatch<'a> {
229    /// Type name for this matrix list
230    pub type_name: String,
231    /// Schema columns
232    pub schema: Vec<String>,
233    /// Row data: (line_number, row_content)
234    pub rows: Vec<(usize, &'a str)>,
235    /// Whether ditto markers were found (requires sequential fallback)
236    pub has_ditto: bool,
237}
238
239impl<'a> MatrixRowBatch<'a> {
240    /// Check if this batch is safe for parallel parsing.
241    ///
242    /// Returns false if ditto markers are present (requires sequential).
243    pub fn can_parallelize(&self) -> bool {
244        !self.has_ditto && !self.rows.is_empty()
245    }
246}
247
248/// Identify root-level entity boundaries in document body.
249///
250/// Scans lines to find where each top-level entity starts and ends.
251/// This is a sequential scan but very fast (O(n) with minimal work).
252pub fn identify_entity_boundaries(lines: &[(usize, &str)]) -> Vec<EntityBoundary> {
253    let mut boundaries = Vec::new();
254    let mut current_key: Option<String> = None;
255    let mut current_start: usize = 0;
256    let mut current_type = EntityType::Object;
257
258    for (idx, &(line_num, line)) in lines.iter().enumerate() {
259        // Skip blank/comment lines
260        let trimmed = line.trim();
261        if trimmed.is_empty() || trimmed.starts_with('#') {
262            continue;
263        }
264
265        // Check if this is a root-level line (no leading whitespace)
266        let indent_info = calculate_indent(line, line_num as u32).ok().flatten();
267        let indent = indent_info.map(|i| i.level).unwrap_or(0);
268
269        if indent == 0 && line.contains(':') {
270            // New root entity starting
271            if let Some(key) = current_key.take() {
272                // Finalize previous entity
273                boundaries.push(EntityBoundary {
274                    key,
275                    line_range: current_start..idx,
276                    entity_type: current_type,
277                });
278            }
279
280            // Extract key from line
281            if let Some(colon_pos) = line.find(':') {
282                let key_part = &line[..colon_pos].trim();
283                // Remove count hint if present: key(N) -> key
284                let key = if let Some(paren_pos) = key_part.find('(') {
285                    key_part[..paren_pos].trim()
286                } else {
287                    key_part
288                };
289
290                current_key = Some(key.to_string());
291                current_start = idx;
292
293                // Determine entity type from content after colon
294                let after_colon = line[colon_pos + 1..].trim();
295                current_type = if after_colon.starts_with('@') {
296                    EntityType::List
297                } else if after_colon.is_empty() {
298                    EntityType::Object
299                } else {
300                    EntityType::Scalar
301                };
302            }
303        }
304    }
305
306    // Finalize last entity
307    if let Some(key) = current_key {
308        boundaries.push(EntityBoundary {
309            key,
310            line_range: current_start..lines.len(),
311            entity_type: current_type,
312        });
313    }
314
315    boundaries
316}
317
318/// Collect matrix row content for parallel parsing.
319///
320/// Scans a matrix list to gather all row lines. Detects ditto markers
321/// which require sequential fallback.
322pub fn collect_matrix_rows<'a>(
323    lines: &'a [(usize, &str)],
324    list_start: usize,
325    expected_indent: usize,
326) -> MatrixRowBatch<'a> {
327    let mut rows = Vec::new();
328    let mut has_ditto = false;
329    let mut type_name = String::new();
330    let mut schema = Vec::new();
331
332    // Parse list declaration line to get type and schema
333    if list_start < lines.len() {
334        let (_, decl_line) = lines[list_start];
335        if let Some(colon_pos) = decl_line.find(':') {
336            let after_colon = decl_line[colon_pos + 1..].trim();
337            if let Some(rest) = after_colon.strip_prefix('@') {
338                // Extract type name and schema
339                if let Some(bracket_pos) = rest.find('[') {
340                    type_name = rest[..bracket_pos].to_string();
341                    let schema_str = &rest[bracket_pos..];
342                    if schema_str.starts_with('[') && schema_str.ends_with(']') {
343                        let inner = &schema_str[1..schema_str.len() - 1];
344                        schema = inner.split(',').map(|s| s.trim().to_string()).collect();
345                    }
346                } else {
347                    type_name = rest.trim().to_string();
348                }
349            }
350        }
351    }
352
353    // Collect row lines
354    let mut i = list_start + 1;
355    while i < lines.len() {
356        let (line_num, line) = lines[i];
357
358        // Check indentation
359        let indent_info = calculate_indent(line, line_num as u32).ok().flatten();
360        let indent = indent_info.map(|info| info.level).unwrap_or(0);
361
362        // Stop if we've dedented out of the list
363        if indent < expected_indent {
364            break;
365        }
366
367        let content = line.trim();
368
369        // Matrix rows start with |
370        if content.starts_with('|') {
371            // Check for ditto markers (")
372            if content.contains('"') && !content.contains("\"\"") {
373                // Single quote outside of escaped quotes = ditto marker
374                let unquoted_part: String = content
375                    .chars()
376                    .scan(false, |in_quote, c| {
377                        if c == '"' {
378                            *in_quote = !*in_quote;
379                        }
380                        Some(if *in_quote { ' ' } else { c })
381                    })
382                    .collect();
383                if unquoted_part.contains('"') {
384                    has_ditto = true;
385                }
386            }
387
388            rows.push((line_num, content));
389        } else if !content.is_empty() && !content.starts_with('#') {
390            // Non-row content (nested list or other) - stop collecting
391            break;
392        }
393
394        i += 1;
395    }
396
397    MatrixRowBatch {
398        type_name,
399        schema,
400        rows,
401        has_ditto,
402    }
403}
404
405/// Parse matrix rows in parallel.
406///
407/// Each row is parsed independently without shared mutable state.
408/// Returns parsed nodes without ID registration (done sequentially after).
409pub fn parse_matrix_rows_parallel(
410    batch: &MatrixRowBatch<'_>,
411    header: &Header,
412    limits: &Limits,
413    counters: &AtomicSecurityCounters,
414) -> HedlResult<Vec<Node>> {
415    if !batch.can_parallelize() {
416        // Fall back to sequential if ditto markers present
417        return parse_matrix_rows_sequential(batch, header, limits);
418    }
419
420    // Parse rows in parallel
421    let nodes: Vec<HedlResult<Node>> = batch
422        .rows
423        .par_iter()
424        .map(|(line_num, row_content)| {
425            // Check security limits (atomic)
426            counters.increment_nodes(limits, *line_num)?;
427
428            // Parse single row
429            parse_single_matrix_row(
430                row_content,
431                &batch.schema,
432                &batch.type_name,
433                header,
434                *line_num,
435            )
436        })
437        .collect();
438
439    // Collect results, propagating first error
440    nodes.into_iter().collect()
441}
442
443/// Parse matrix rows sequentially (fallback for ditto markers).
444fn parse_matrix_rows_sequential(
445    batch: &MatrixRowBatch<'_>,
446    header: &Header,
447    limits: &Limits,
448) -> HedlResult<Vec<Node>> {
449    let mut nodes = Vec::with_capacity(batch.rows.len());
450    let mut prev_values: Option<Vec<Value>> = None;
451    let mut node_count = 0usize;
452
453    for (line_num, row_content) in &batch.rows {
454        // Check node count limit
455        node_count = node_count
456            .checked_add(1)
457            .ok_or_else(|| HedlError::security("node count overflow", *line_num))?;
458        if node_count > limits.max_nodes {
459            return Err(HedlError::security(
460                format!("too many nodes: exceeds limit of {}", limits.max_nodes),
461                *line_num,
462            ));
463        }
464
465        // Parse with ditto support
466        let node = parse_single_matrix_row_with_ditto(
467            row_content,
468            &batch.schema,
469            &batch.type_name,
470            header,
471            *line_num,
472            prev_values.as_deref(),
473        )?;
474
475        prev_values = Some(node.fields.to_vec());
476        nodes.push(node);
477    }
478
479    Ok(nodes)
480}
481
482/// Parse a single matrix row without ditto support.
483///
484/// Used for parallel parsing where rows are independent.
485fn parse_single_matrix_row(
486    row_content: &str,
487    schema: &[String],
488    type_name: &str,
489    header: &Header,
490    line_num: usize,
491) -> HedlResult<Node> {
492    parse_single_matrix_row_with_ditto(row_content, schema, type_name, header, line_num, None)
493}
494
495/// Parse a single matrix row with optional ditto support.
496fn parse_single_matrix_row_with_ditto(
497    row_content: &str,
498    schema: &[String],
499    type_name: &str,
500    header: &Header,
501    line_num: usize,
502    prev_values: Option<&[Value]>,
503) -> HedlResult<Node> {
504    // Skip | prefix and extract optional child count
505    let content = row_content.strip_prefix('|').unwrap_or(row_content);
506
507    // Parse child count prefix if present: [N] data
508    let (child_count, csv_content) = if content.starts_with('[') {
509        if let Some(bracket_end) = content.find(']') {
510            let count_str = &content[1..bracket_end];
511            if let Ok(count) = count_str.parse::<usize>() {
512                let data = content[bracket_end + 1..].trim_start();
513                (Some(count), data)
514            } else {
515                (None, content)
516            }
517        } else {
518            (None, content)
519        }
520    } else {
521        (None, content)
522    };
523
524    let csv_content = strip_comment(csv_content).trim();
525
526    // Parse CSV fields
527    let fields =
528        parse_csv_row(csv_content).map_err(|e| HedlError::syntax(e.to_string(), line_num))?;
529
530    // Validate field count
531    if fields.len() != schema.len() {
532        return Err(HedlError::shape(
533            format!("expected {} columns, got {}", schema.len(), fields.len()),
534            line_num,
535        ));
536    }
537
538    // Infer values
539    let mut values = Vec::with_capacity(fields.len());
540    for (col_idx, field) in fields.iter().enumerate() {
541        let ctx =
542            InferenceContext::for_matrix_cell(&header.aliases, col_idx, prev_values, type_name);
543
544        let value = if field.is_quoted {
545            infer_quoted_value(&field.value)
546        } else {
547            infer_value(&field.value, &ctx, line_num)?
548        };
549
550        values.push(value);
551    }
552
553    // Extract ID from first column
554    let id = match &values[0] {
555        Value::String(s) => s.clone(),
556        _ => {
557            return Err(HedlError::semantic("ID column must be a string", line_num));
558        }
559    };
560
561    // Create node
562    let mut node = Node::new(type_name, &*id, values);
563    if let Some(count) = child_count {
564        node.set_child_count(count);
565    }
566
567    Ok(node)
568}
569
570/// Parallel ID collection from document tree.
571///
572/// Collects node IDs from independent subtrees in parallel, then merges
573/// registries sequentially with conflict detection.
574pub fn collect_ids_parallel(
575    items: &BTreeMap<String, Item>,
576    limits: &Limits,
577) -> HedlResult<TypeRegistry> {
578    // Skip parallelism for small documents
579    if items.len() < 10 {
580        let mut registry = TypeRegistry::new();
581        collect_ids_sequential(items, &mut registry, 0, limits.max_nest_depth, limits)?;
582        return Ok(registry);
583    }
584
585    // Collect IDs from each item in parallel
586    let local_registries: Vec<HedlResult<TypeRegistry>> = items
587        .par_iter()
588        .map(|(_, item)| {
589            let mut local = TypeRegistry::new();
590            collect_item_ids(item, &mut local, 0, limits.max_nest_depth, limits)?;
591            Ok(local)
592        })
593        .collect();
594
595    // Merge registries sequentially with conflict detection
596    let mut merged = TypeRegistry::new();
597    for result in local_registries {
598        let local = result?;
599        merge_registry_into(&mut merged, local, limits)?;
600    }
601
602    Ok(merged)
603}
604
605/// Sequential ID collection (fallback).
606fn collect_ids_sequential(
607    items: &BTreeMap<String, Item>,
608    registry: &mut TypeRegistry,
609    depth: usize,
610    max_depth: usize,
611    limits: &Limits,
612) -> HedlResult<()> {
613    if depth > max_depth {
614        return Err(HedlError::security(
615            format!(
616                "NEST hierarchy depth {} exceeds maximum allowed depth {}",
617                depth, max_depth
618            ),
619            0,
620        ));
621    }
622
623    for item in items.values() {
624        collect_item_ids(item, registry, depth, max_depth, limits)?;
625    }
626
627    Ok(())
628}
629
630/// Collect IDs from a single item.
631fn collect_item_ids(
632    item: &Item,
633    registry: &mut TypeRegistry,
634    depth: usize,
635    max_depth: usize,
636    limits: &Limits,
637) -> HedlResult<()> {
638    match item {
639        Item::List(list) => {
640            collect_list_ids(list, registry, depth, max_depth, limits)?;
641        }
642        Item::Object(obj) => {
643            collect_ids_sequential(obj, registry, depth + 1, max_depth, limits)?;
644        }
645        Item::Scalar(_) => {}
646    }
647    Ok(())
648}
649
650/// Collect IDs from a matrix list.
651fn collect_list_ids(
652    list: &MatrixList,
653    registry: &mut TypeRegistry,
654    depth: usize,
655    max_depth: usize,
656    limits: &Limits,
657) -> HedlResult<()> {
658    for node in &list.rows {
659        registry.register(&list.type_name, &node.id, 0, limits)?;
660    }
661
662    // Recurse into children
663    for node in &list.rows {
664        if let Some(children) = node.children() {
665            for child_list in children.values() {
666                for child in child_list {
667                    collect_node_ids(child, registry, depth + 1, max_depth, limits)?;
668                }
669            }
670        }
671    }
672
673    Ok(())
674}
675
676/// Collect IDs from a node and its children.
677fn collect_node_ids(
678    node: &Node,
679    registry: &mut TypeRegistry,
680    depth: usize,
681    max_depth: usize,
682    limits: &Limits,
683) -> HedlResult<()> {
684    if depth > max_depth {
685        return Err(HedlError::security(
686            format!(
687                "NEST hierarchy depth {} exceeds maximum allowed depth {}",
688                depth, max_depth
689            ),
690            0,
691        ));
692    }
693
694    registry.register(&node.type_name, &node.id, 0, limits)?;
695
696    if let Some(children) = node.children() {
697        for child_list in children.values() {
698            for child in child_list {
699                collect_node_ids(child, registry, depth + 1, max_depth, limits)?;
700            }
701        }
702    }
703
704    Ok(())
705}
706
707/// Merge source registry into target with conflict detection.
708fn merge_registry_into(
709    target: &mut TypeRegistry,
710    source: TypeRegistry,
711    limits: &Limits,
712) -> HedlResult<()> {
713    // Get IDs from source by iterating through the inverted index
714    for (id, types) in source.by_id_iter() {
715        for type_name in types {
716            // Check for conflict in target
717            if target.contains_in_type(type_name, id) {
718                return Err(HedlError::collision(
719                    format!(
720                        "duplicate ID '{}' in type '{}' detected during parallel merge",
721                        id, type_name
722                    ),
723                    0,
724                ));
725            }
726            // Register in target
727            target.register(type_name, id, 0, limits)?;
728        }
729    }
730    Ok(())
731}
732
733/// Parallel reference validation.
734///
735/// Validates references across document items in parallel.
736/// The registry is read-only at this point, so no synchronization needed.
737pub fn validate_references_parallel(
738    items: &BTreeMap<String, Item>,
739    registries: &TypeRegistry,
740    strict: bool,
741    max_depth: usize,
742) -> HedlResult<()> {
743    // Skip parallelism for small documents
744    if items.len() < 10 {
745        return validate_references_sequential(items, registries, strict, None, 0, max_depth);
746    }
747
748    // Validate each item in parallel
749    let results: Vec<HedlResult<()>> = items
750        .par_iter()
751        .map(|(_, item)| validate_item_refs(item, registries, strict, None, 0, max_depth))
752        .collect();
753
754    // Propagate first error
755    for result in results {
756        result?;
757    }
758
759    Ok(())
760}
761
762/// Sequential reference validation (fallback).
763fn validate_references_sequential(
764    items: &BTreeMap<String, Item>,
765    registries: &TypeRegistry,
766    strict: bool,
767    current_type: Option<&str>,
768    depth: usize,
769    max_depth: usize,
770) -> HedlResult<()> {
771    if depth > max_depth {
772        return Err(HedlError::security(
773            format!(
774                "NEST hierarchy depth {} exceeds maximum allowed depth {}",
775                depth, max_depth
776            ),
777            0,
778        ));
779    }
780
781    for item in items.values() {
782        validate_item_refs(item, registries, strict, current_type, depth, max_depth)?;
783    }
784
785    Ok(())
786}
787
788/// Validate references in a single item.
789fn validate_item_refs(
790    item: &Item,
791    registries: &TypeRegistry,
792    strict: bool,
793    current_type: Option<&str>,
794    depth: usize,
795    max_depth: usize,
796) -> HedlResult<()> {
797    match item {
798        Item::Scalar(value) => {
799            validate_value_ref(value, registries, strict, current_type)?;
800        }
801        Item::List(list) => {
802            for node in &list.rows {
803                validate_node_refs(node, registries, strict, depth, max_depth)?;
804            }
805        }
806        Item::Object(obj) => {
807            validate_references_sequential(
808                obj,
809                registries,
810                strict,
811                current_type,
812                depth + 1,
813                max_depth,
814            )?;
815        }
816    }
817    Ok(())
818}
819
820/// Validate references in a node.
821fn validate_node_refs(
822    node: &Node,
823    registries: &TypeRegistry,
824    strict: bool,
825    depth: usize,
826    max_depth: usize,
827) -> HedlResult<()> {
828    if depth > max_depth {
829        return Err(HedlError::security(
830            format!(
831                "NEST hierarchy depth {} exceeds maximum allowed depth {}",
832                depth, max_depth
833            ),
834            0,
835        ));
836    }
837
838    for value in &node.fields {
839        validate_value_ref(value, registries, strict, Some(&node.type_name))?;
840    }
841
842    if let Some(children) = node.children() {
843        for child_list in children.values() {
844            for child in child_list {
845                validate_node_refs(child, registries, strict, depth + 1, max_depth)?;
846            }
847        }
848    }
849
850    Ok(())
851}
852
853/// Validate a single reference value.
854fn validate_value_ref(
855    value: &Value,
856    registries: &TypeRegistry,
857    strict: bool,
858    current_type: Option<&str>,
859) -> HedlResult<()> {
860    if let Value::Reference(ref_val) = value {
861        let resolved = match &ref_val.type_name {
862            Some(t) => registries.contains_in_type(t, &ref_val.id),
863            None => match current_type {
864                Some(type_name) => registries.contains_in_type(type_name, &ref_val.id),
865                None => {
866                    let matching_types = registries.lookup_unqualified(&ref_val.id).unwrap_or(&[]);
867                    match matching_types.len() {
868                        0 => false,
869                        1 => true,
870                        _ => {
871                            return Err(HedlError::reference(
872                                format!(
873                                    "Ambiguous unqualified reference '@{}' matches multiple types: [{}]",
874                                    ref_val.id,
875                                    matching_types.join(", ")
876                                ),
877                                0,
878                            ));
879                        }
880                    }
881                }
882            },
883        };
884
885        if !resolved && strict {
886            return Err(HedlError::reference(
887                format!("unresolved reference {}", ref_val.to_ref_string()),
888                0,
889            ));
890        }
891    }
892
893    Ok(())
894}
895
896#[cfg(test)]
897mod tests {
898    use super::*;
899
900    #[test]
901    fn test_parallel_config_default() {
902        let config = ParallelConfig::default();
903        assert!(config.enabled);
904        assert_eq!(config.min_root_entities, 50);
905        assert_eq!(config.min_list_rows, 100);
906        assert!(config.thread_pool_size.is_none());
907    }
908
909    #[test]
910    fn test_parallel_config_conservative() {
911        let config = ParallelConfig::conservative();
912        assert_eq!(config.min_root_entities, 100);
913        assert_eq!(config.min_list_rows, 200);
914    }
915
916    #[test]
917    fn test_parallel_config_aggressive() {
918        let config = ParallelConfig::aggressive();
919        assert_eq!(config.min_root_entities, 20);
920        assert_eq!(config.min_list_rows, 50);
921    }
922
923    #[test]
924    fn test_parallel_config_thresholds() {
925        let config = ParallelConfig::default();
926
927        assert!(!config.should_parallelize_entities(10));
928        assert!(!config.should_parallelize_entities(49));
929        assert!(config.should_parallelize_entities(50));
930        assert!(config.should_parallelize_entities(100));
931
932        assert!(!config.should_parallelize_rows(10));
933        assert!(!config.should_parallelize_rows(99));
934        assert!(config.should_parallelize_rows(100));
935        assert!(config.should_parallelize_rows(1000));
936    }
937
938    #[test]
939    fn test_atomic_counters() {
940        let counters = AtomicSecurityCounters::new();
941        assert_eq!(counters.node_count(), 0);
942        assert_eq!(counters.key_count(), 0);
943
944        let limits = Limits::default();
945
946        // Increment should work
947        counters.increment_nodes(&limits, 0).unwrap();
948        assert_eq!(counters.node_count(), 1);
949
950        counters.increment_keys(&limits, 0).unwrap();
951        assert_eq!(counters.key_count(), 1);
952    }
953
954    #[test]
955    fn test_atomic_counters_limit_exceeded() {
956        let counters = AtomicSecurityCounters::new();
957        let limits = Limits {
958            max_nodes: 1,
959            ..Default::default()
960        };
961
962        // First increment succeeds
963        counters.increment_nodes(&limits, 0).unwrap();
964
965        // Second increment fails
966        let result = counters.increment_nodes(&limits, 0);
967        assert!(result.is_err());
968    }
969
970    #[test]
971    fn test_entity_boundary_identification() {
972        let lines: Vec<(usize, &str)> = vec![
973            (1, "users: @User"),
974            (2, "| alice"),
975            (3, "| bob"),
976            (4, "settings:"),
977            (5, "  debug: true"),
978            (6, "count: 42"),
979        ];
980
981        let boundaries = identify_entity_boundaries(&lines);
982        assert_eq!(boundaries.len(), 3);
983
984        assert_eq!(boundaries[0].key, "users");
985        assert_eq!(boundaries[0].entity_type, EntityType::List);
986        assert_eq!(boundaries[0].line_range, 0..3);
987
988        assert_eq!(boundaries[1].key, "settings");
989        assert_eq!(boundaries[1].entity_type, EntityType::Object);
990        assert_eq!(boundaries[1].line_range, 3..5);
991
992        assert_eq!(boundaries[2].key, "count");
993        assert_eq!(boundaries[2].entity_type, EntityType::Scalar);
994        assert_eq!(boundaries[2].line_range, 5..6);
995    }
996
997    #[test]
998    fn test_matrix_row_batch_no_ditto() {
999        let batch = MatrixRowBatch {
1000            type_name: "User".to_string(),
1001            schema: vec!["id".to_string(), "name".to_string()],
1002            rows: vec![(1, "|alice, Alice"), (2, "|bob, Bob")],
1003            has_ditto: false,
1004        };
1005
1006        assert!(batch.can_parallelize());
1007    }
1008
1009    #[test]
1010    fn test_matrix_row_batch_with_ditto() {
1011        let batch = MatrixRowBatch {
1012            type_name: "User".to_string(),
1013            schema: vec!["id".to_string(), "name".to_string()],
1014            rows: vec![(1, "|alice, Alice"), (2, "|bob, \"")],
1015            has_ditto: true,
1016        };
1017
1018        assert!(!batch.can_parallelize());
1019    }
1020
1021    #[test]
1022    fn test_parse_single_matrix_row() {
1023        let header = Header::new((1, 0));
1024        let schema = vec!["id".to_string(), "name".to_string()];
1025
1026        let node = parse_single_matrix_row("|alice, Alice", &schema, "User", &header, 1).unwrap();
1027
1028        assert_eq!(node.id, "alice");
1029        assert_eq!(node.type_name, "User");
1030        assert_eq!(node.fields.len(), 2);
1031    }
1032
1033    #[test]
1034    fn test_parse_single_matrix_row_with_child_count() {
1035        let header = Header::new((1, 0));
1036        let schema = vec!["id".to_string(), "name".to_string()];
1037
1038        let node =
1039            parse_single_matrix_row("|[3] alice, Alice", &schema, "User", &header, 1).unwrap();
1040
1041        assert_eq!(node.id, "alice");
1042        assert_eq!(node.child_count, 3);
1043    }
1044}