Skip to main content

plexus_serde/plan/
validate.rs

1use crate::types::{ColDef, ColKind, Expr, Op, Plan};
2
3/// Errors detected during structural validation of a deserialized plan.
4#[derive(Debug, Clone, PartialEq, Eq)]
5pub enum PlanValidationError {
6    /// `root_op` index is out of bounds.
7    InvalidRootOp { root_op: u32, num_ops: u32 },
8    /// An op at `op_index` references an input that is out of bounds.
9    InputOutOfBounds {
10        op_index: u32,
11        field: &'static str,
12        input: u32,
13        num_ops: u32,
14    },
15    /// An op at `op_index` references a forward (or self) input, which would
16    /// create a cycle or depend on an op that hasn't been computed yet.
17    ForwardReference {
18        op_index: u32,
19        field: &'static str,
20        input: u32,
21    },
22    /// The plan contains no ops.
23    EmptyPlan,
24    /// An op has a column index reference outside the available input width.
25    ExprColOutOfBounds {
26        op_index: u32,
27        field: &'static str,
28        col_idx: u32,
29        input_width: u32,
30    },
31    /// An op references a key/column index outside the available input width.
32    ColumnIndexOutOfBounds {
33        op_index: u32,
34        field: &'static str,
35        col_idx: u32,
36        input_width: u32,
37    },
38    /// A schema-bearing op has inconsistent arity contracts.
39    SchemaArityMismatch {
40        op_index: u32,
41        field: &'static str,
42        expected: u32,
43        actual: u32,
44    },
45    /// Sort key and sort direction vectors differ in length.
46    SortArityMismatch { op_index: u32, keys: u32, dirs: u32 },
47    /// A graph_ref on a graph-touching op is malformed.
48    InvalidGraphRef {
49        op_index: u32,
50        field: &'static str,
51        value: String,
52    },
53    /// A field requiring a strictly positive integer received zero.
54    NonPositiveValue {
55        op_index: u32,
56        field: &'static str,
57        value: u32,
58    },
59    /// A column referenced by a field has the wrong `ColKind` for the op.
60    SchemaKindMismatch {
61        op_index: u32,
62        field: &'static str,
63        col_idx: u32,
64        expected_kind: ColKind,
65        actual_kind: ColKind,
66    },
67}
68
69impl core::fmt::Display for PlanValidationError {
70    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
71        match self {
72            Self::InvalidRootOp { root_op, num_ops } => {
73                write!(
74                    f,
75                    "root_op {root_op} is out of bounds (plan has {num_ops} ops)"
76                )
77            }
78            Self::InputOutOfBounds {
79                op_index,
80                field,
81                input,
82                num_ops,
83            } => write!(
84                f,
85                "op[{op_index}].{field} references index {input}, but plan has {num_ops} ops"
86            ),
87            Self::ForwardReference {
88                op_index,
89                field,
90                input,
91            } => write!(
92                f,
93                "op[{op_index}].{field} references index {input} (forward or self reference)"
94            ),
95            Self::EmptyPlan => write!(f, "plan contains no ops"),
96            Self::ExprColOutOfBounds {
97                op_index,
98                field,
99                col_idx,
100                input_width,
101            } => write!(
102                f,
103                "op[{op_index}].{field} references column {col_idx}, but input width is {input_width}"
104            ),
105            Self::ColumnIndexOutOfBounds {
106                op_index,
107                field,
108                col_idx,
109                input_width,
110            } => write!(
111                f,
112                "op[{op_index}].{field} index {col_idx} is out of bounds for input width {input_width}"
113            ),
114            Self::SchemaArityMismatch {
115                op_index,
116                field,
117                expected,
118                actual,
119            } => write!(
120                f,
121                "op[{op_index}].{field} arity mismatch: expected {expected}, got {actual}"
122            ),
123            Self::SortArityMismatch {
124                op_index,
125                keys,
126                dirs,
127            } => write!(
128                f,
129                "op[{op_index}] sort arity mismatch: {keys} key(s) vs {dirs} dir(s)"
130            ),
131            Self::InvalidGraphRef {
132                op_index,
133                field,
134                value,
135            } => write!(
136                f,
137                "op[{op_index}].{field} has invalid graph_ref `{value}` (expected [A-Za-z_][A-Za-z0-9_]*)"
138            ),
139            Self::NonPositiveValue {
140                op_index,
141                field,
142                value,
143            } => write!(f, "op[{op_index}].{field} must be > 0, got {value}"),
144            Self::SchemaKindMismatch {
145                op_index,
146                field,
147                col_idx,
148                expected_kind,
149                actual_kind,
150            } => write!(
151                f,
152                "op[{op_index}].{field} col[{col_idx}] kind mismatch: expected {expected_kind:?}, got {actual_kind:?}"
153            ),
154        }
155    }
156}
157
158impl std::error::Error for PlanValidationError {}
159
160/// Validate the structural integrity of a deserialized plan.
161///
162/// Checks that:
163/// - The plan is non-empty.
164/// - `root_op` is a valid index into the ops vector.
165/// - Every `input` / `lhs` / `rhs` reference is in-bounds.
166/// - No op references itself or a later op (no forward references = no cycles).
167///
168/// This function performs no semantic validation (e.g., type compatibility
169/// between connected ops). Call this immediately after deserialization to
170/// reject malformed plans before engine execution.
171pub fn validate_plan_structure(plan: &Plan) -> Result<(), Vec<PlanValidationError>> {
172    let mut errors = Vec::new();
173    let num_ops = plan.ops.len() as u32;
174    let mut output_widths: Vec<Option<u32>> = vec![None; plan.ops.len()];
175    let mut output_schemas: Vec<Option<Vec<ColDef>>> = vec![None; plan.ops.len()];
176
177    if plan.ops.is_empty() {
178        errors.push(PlanValidationError::EmptyPlan);
179        return Err(errors);
180    }
181
182    if plan.root_op >= num_ops {
183        errors.push(PlanValidationError::InvalidRootOp {
184            root_op: plan.root_op,
185            num_ops,
186        });
187    }
188
189    for (idx, op) in plan.ops.iter().enumerate() {
190        let op_index = idx as u32;
191        for (field, input) in op_input_refs(op) {
192            if input >= num_ops {
193                errors.push(PlanValidationError::InputOutOfBounds {
194                    op_index,
195                    field,
196                    input,
197                    num_ops,
198                });
199            } else if input >= op_index {
200                errors.push(PlanValidationError::ForwardReference {
201                    op_index,
202                    field,
203                    input,
204                });
205            }
206        }
207
208        validate_op_schema_contract(op, op_index, &output_widths, &mut errors);
209        validate_op_kind_contract(op, op_index, &output_schemas, &mut errors);
210        validate_op_graph_ref(op, op_index, &mut errors);
211        output_widths[idx] = op_output_width(op, &output_widths);
212        output_schemas[idx] = op_output_schema(op, &output_schemas);
213    }
214
215    if errors.is_empty() {
216        Ok(())
217    } else {
218        Err(errors)
219    }
220}
221
222fn is_simple_ident(s: &str) -> bool {
223    let mut chars = s.chars();
224    let Some(first) = chars.next() else {
225        return false;
226    };
227    if !(first == '_' || first.is_ascii_alphabetic()) {
228        return false;
229    }
230    chars.all(|c| c == '_' || c.is_ascii_alphanumeric())
231}
232
233fn is_valid_graph_ref_ident(value: &str) -> bool {
234    // Accept graph parameter variables: $ident
235    if let Some(rest) = value.strip_prefix('$') {
236        return !rest.is_empty() && is_simple_ident(rest);
237    }
238    // Accept simple identifiers and catalog-qualified dotted paths (a.b.c).
239    // Each dot-separated segment must be a valid simple identifier; spaces
240    // around dots (e.g. "a . b") are rejected because they fail segment checks.
241    !value.is_empty() && value.split('.').all(is_simple_ident)
242}
243
244fn validate_graph_ref_field(
245    op_index: u32,
246    field: &'static str,
247    graph_ref: &Option<String>,
248    errors: &mut Vec<PlanValidationError>,
249) {
250    if let Some(value) = graph_ref {
251        let trimmed = value.trim();
252        if trimmed.is_empty() || !is_valid_graph_ref_ident(trimmed) {
253            errors.push(PlanValidationError::InvalidGraphRef {
254                op_index,
255                field,
256                value: value.clone(),
257            });
258        }
259    }
260}
261
262fn validate_op_graph_ref(op: &Op, op_index: u32, errors: &mut Vec<PlanValidationError>) {
263    match op {
264        Op::ScanNodes { graph_ref, .. }
265        | Op::Expand { graph_ref, .. }
266        | Op::OptionalExpand { graph_ref, .. }
267        | Op::ExpandVarLen { graph_ref, .. } => {
268            validate_graph_ref_field(op_index, "graph_ref", graph_ref, errors);
269        }
270        _ => {}
271    }
272}
273
274fn input_width(output_widths: &[Option<u32>], input: u32) -> Option<u32> {
275    output_widths.get(input as usize).and_then(|v| *v)
276}
277
278fn schema_len(schema: &[crate::types::ColDef]) -> u32 {
279    schema.len() as u32
280}
281
282fn schema_width_opt(schema: &[crate::types::ColDef]) -> Option<u32> {
283    if schema.is_empty() {
284        None
285    } else {
286        Some(schema_len(schema))
287    }
288}
289
290fn expr_col_refs_within(
291    expr: &Expr,
292    input_width: u32,
293    op_index: u32,
294    field: &'static str,
295    errors: &mut Vec<PlanValidationError>,
296) {
297    match expr {
298        Expr::ColRef { idx } => {
299            if *idx >= input_width {
300                errors.push(PlanValidationError::ExprColOutOfBounds {
301                    op_index,
302                    field,
303                    col_idx: *idx,
304                    input_width,
305                });
306            }
307        }
308        Expr::PropAccess { col, .. } => {
309            if *col >= input_width {
310                errors.push(PlanValidationError::ExprColOutOfBounds {
311                    op_index,
312                    field,
313                    col_idx: *col,
314                    input_width,
315                });
316            }
317        }
318        Expr::Cmp { lhs, rhs, .. }
319        | Expr::And { lhs, rhs }
320        | Expr::Or { lhs, rhs }
321        | Expr::Arith { lhs, rhs, .. }
322        | Expr::VectorSimilarity { lhs, rhs, .. } => {
323            expr_col_refs_within(lhs, input_width, op_index, field, errors);
324            expr_col_refs_within(rhs, input_width, op_index, field, errors);
325        }
326        Expr::Not { expr }
327        | Expr::IsNull { expr }
328        | Expr::IsNotNull { expr }
329        | Expr::Exists { expr } => {
330            expr_col_refs_within(expr, input_width, op_index, field, errors);
331        }
332        Expr::StartsWith { expr, .. }
333        | Expr::EndsWith { expr, .. }
334        | Expr::Contains { expr, .. } => {
335            expr_col_refs_within(expr, input_width, op_index, field, errors);
336        }
337        Expr::In { expr, items } => {
338            expr_col_refs_within(expr, input_width, op_index, field, errors);
339            for item in items {
340                expr_col_refs_within(item, input_width, op_index, field, errors);
341            }
342        }
343        Expr::ListLiteral { items } => {
344            for item in items {
345                expr_col_refs_within(item, input_width, op_index, field, errors);
346            }
347        }
348        Expr::MapLiteral { entries } => {
349            for (_, value) in entries {
350                expr_col_refs_within(value, input_width, op_index, field, errors);
351            }
352        }
353        Expr::ListComprehension {
354            list,
355            predicate,
356            map,
357            ..
358        } => {
359            expr_col_refs_within(list, input_width, op_index, field, errors);
360            if let Some(pred) = predicate {
361                expr_col_refs_within(pred, input_width, op_index, field, errors);
362            }
363            expr_col_refs_within(map, input_width, op_index, field, errors);
364        }
365        Expr::Agg { expr, .. } => {
366            if let Some(inner) = expr {
367                expr_col_refs_within(inner, input_width, op_index, field, errors);
368            }
369        }
370        Expr::Case { arms, else_expr } => {
371            for (when_expr, then_expr) in arms {
372                expr_col_refs_within(when_expr, input_width, op_index, field, errors);
373                expr_col_refs_within(then_expr, input_width, op_index, field, errors);
374            }
375            if let Some(e) = else_expr {
376                expr_col_refs_within(e, input_width, op_index, field, errors);
377            }
378        }
379        Expr::Param { .. }
380        | Expr::IntLiteral(_)
381        | Expr::FloatLiteral(_)
382        | Expr::BoolLiteral(_)
383        | Expr::StringLiteral(_)
384        | Expr::NullLiteral => {}
385    }
386}
387
388fn check_col_idx(
389    op_index: u32,
390    field: &'static str,
391    col_idx: u32,
392    input_width: u32,
393    errors: &mut Vec<PlanValidationError>,
394) {
395    if col_idx >= input_width {
396        errors.push(PlanValidationError::ColumnIndexOutOfBounds {
397            op_index,
398            field,
399            col_idx,
400            input_width,
401        });
402    }
403}
404
405fn validate_op_schema_contract(
406    op: &Op,
407    op_index: u32,
408    output_widths: &[Option<u32>],
409    errors: &mut Vec<PlanValidationError>,
410) {
411    match op {
412        Op::ScanNodes { .. } | Op::ScanRels { .. } => {}
413        Op::Expand { input, src_col, .. }
414        | Op::OptionalExpand { input, src_col, .. }
415        | Op::SemiExpand { input, src_col, .. }
416        | Op::ExpandVarLen { input, src_col, .. } => {
417            if let Some(width) = input_width(output_widths, *input) {
418                check_col_idx(op_index, "src_col", *src_col, width, errors);
419            }
420        }
421        Op::Filter { input, predicate } => {
422            if let Some(width) = input_width(output_widths, *input) {
423                expr_col_refs_within(predicate, width, op_index, "predicate", errors);
424            }
425        }
426        Op::Project {
427            input,
428            exprs,
429            schema,
430        } => {
431            if !schema.is_empty() {
432                let expected = schema_len(schema);
433                let actual = exprs.len() as u32;
434                if expected != actual {
435                    errors.push(PlanValidationError::SchemaArityMismatch {
436                        op_index,
437                        field: "project.exprs_vs_schema",
438                        expected,
439                        actual,
440                    });
441                }
442            }
443            if let Some(width) = input_width(output_widths, *input) {
444                for expr in exprs {
445                    expr_col_refs_within(expr, width, op_index, "exprs", errors);
446                }
447            }
448        }
449        Op::Aggregate {
450            input, keys, aggs, ..
451        } => {
452            if let Some(width) = input_width(output_widths, *input) {
453                for key in keys {
454                    check_col_idx(op_index, "keys", *key, width, errors);
455                }
456                for agg in aggs {
457                    expr_col_refs_within(agg, width, op_index, "aggs", errors);
458                }
459            }
460        }
461        Op::Sort { input, keys, dirs } => {
462            if keys.len() != dirs.len() {
463                errors.push(PlanValidationError::SortArityMismatch {
464                    op_index,
465                    keys: keys.len() as u32,
466                    dirs: dirs.len() as u32,
467                });
468            }
469            if let Some(width) = input_width(output_widths, *input) {
470                for key in keys {
471                    check_col_idx(op_index, "keys", *key, width, errors);
472                }
473            }
474        }
475        Op::Unwind {
476            input, list_expr, ..
477        } => {
478            if let Some(width) = input_width(output_widths, *input) {
479                expr_col_refs_within(list_expr, width, op_index, "list_expr", errors);
480            }
481        }
482        Op::PathConstruct {
483            input, rel_cols, ..
484        } => {
485            if let Some(width) = input_width(output_widths, *input) {
486                for col in rel_cols {
487                    check_col_idx(op_index, "rel_cols", *col, width, errors);
488                }
489            }
490        }
491        Op::Union {
492            lhs, rhs, schema, ..
493        } => {
494            let out = schema_len(schema);
495            if out > 0 {
496                if let Some(lhs_w) = input_width(output_widths, *lhs) {
497                    if lhs_w != out {
498                        errors.push(PlanValidationError::SchemaArityMismatch {
499                            op_index,
500                            field: "union.lhs_vs_schema",
501                            expected: out,
502                            actual: lhs_w,
503                        });
504                    }
505                }
506                if let Some(rhs_w) = input_width(output_widths, *rhs) {
507                    if rhs_w != out {
508                        errors.push(PlanValidationError::SchemaArityMismatch {
509                            op_index,
510                            field: "union.rhs_vs_schema",
511                            expected: out,
512                            actual: rhs_w,
513                        });
514                    }
515                }
516            }
517        }
518        Op::CreateNode { input, props, .. } => {
519            if let Some(width) = input_width(output_widths, *input) {
520                expr_col_refs_within(props, width, op_index, "props", errors);
521            }
522        }
523        Op::CreateRel {
524            input,
525            src_col,
526            dst_col,
527            props,
528            ..
529        } => {
530            if let Some(width) = input_width(output_widths, *input) {
531                if *src_col >= 0 {
532                    check_col_idx(op_index, "src_col", *src_col as u32, width, errors);
533                }
534                if *dst_col >= 0 {
535                    check_col_idx(op_index, "dst_col", *dst_col as u32, width, errors);
536                }
537                expr_col_refs_within(props, width, op_index, "props", errors);
538            }
539        }
540        Op::Merge {
541            input,
542            pattern,
543            on_create_props,
544            on_match_props,
545            ..
546        } => {
547            if let Some(width) = input_width(output_widths, *input) {
548                expr_col_refs_within(pattern, width, op_index, "pattern", errors);
549                expr_col_refs_within(on_create_props, width, op_index, "on_create_props", errors);
550                expr_col_refs_within(on_match_props, width, op_index, "on_match_props", errors);
551            }
552        }
553        Op::Delete {
554            input, target_col, ..
555        } => {
556            if let Some(width) = input_width(output_widths, *input) {
557                if *target_col >= 0 {
558                    check_col_idx(op_index, "target_col", *target_col as u32, width, errors);
559                }
560            }
561        }
562        Op::SetProperty {
563            input,
564            target_col,
565            value_expr,
566            ..
567        } => {
568            if let Some(width) = input_width(output_widths, *input) {
569                if *target_col >= 0 {
570                    check_col_idx(op_index, "target_col", *target_col as u32, width, errors);
571                }
572                expr_col_refs_within(value_expr, width, op_index, "value_expr", errors);
573            }
574        }
575        Op::RemoveProperty {
576            input, target_col, ..
577        } => {
578            if let Some(width) = input_width(output_widths, *input) {
579                if *target_col >= 0 {
580                    check_col_idx(op_index, "target_col", *target_col as u32, width, errors);
581                }
582            }
583        }
584        Op::VectorScan {
585            input,
586            query_vector,
587            top_k,
588            ..
589        } => {
590            if *top_k == 0 {
591                errors.push(PlanValidationError::NonPositiveValue {
592                    op_index,
593                    field: "top_k",
594                    value: *top_k,
595                });
596            }
597            if let Some(width) = input_width(output_widths, *input) {
598                expr_col_refs_within(query_vector, width, op_index, "query_vector", errors);
599            }
600        }
601        Op::Rerank {
602            input,
603            score_expr,
604            top_k,
605            ..
606        } => {
607            if *top_k == 0 {
608                errors.push(PlanValidationError::NonPositiveValue {
609                    op_index,
610                    field: "top_k",
611                    value: *top_k,
612                });
613            }
614            if let Some(width) = input_width(output_widths, *input) {
615                expr_col_refs_within(score_expr, width, op_index, "score_expr", errors);
616            }
617        }
618        Op::BlockMarker { .. } | Op::Limit { .. } | Op::Return { .. } | Op::ConstRow => {}
619    }
620}
621
622fn input_schema(output_schemas: &[Option<Vec<ColDef>>], input: u32) -> Option<&Vec<ColDef>> {
623    output_schemas.get(input as usize).and_then(|o| o.as_ref())
624}
625
626fn check_col_kind(
627    op_index: u32,
628    field: &'static str,
629    col_idx: i32,
630    schema: &[ColDef],
631    allowed: &[ColKind],
632    errors: &mut Vec<PlanValidationError>,
633) {
634    if col_idx < 0 {
635        return;
636    }
637    let idx = col_idx as usize;
638    if let Some(col) = schema.get(idx) {
639        if !allowed.contains(&col.kind) {
640            errors.push(PlanValidationError::SchemaKindMismatch {
641                op_index,
642                field,
643                col_idx: idx as u32,
644                expected_kind: allowed[0],
645                actual_kind: col.kind,
646            });
647        }
648    }
649}
650
651fn validate_op_kind_contract(
652    op: &Op,
653    op_index: u32,
654    output_schemas: &[Option<Vec<ColDef>>],
655    errors: &mut Vec<PlanValidationError>,
656) {
657    match op {
658        Op::Expand { input, src_col, .. }
659        | Op::OptionalExpand { input, src_col, .. }
660        | Op::SemiExpand { input, src_col, .. }
661        | Op::ExpandVarLen { input, src_col, .. } => {
662            if let Some(schema) = input_schema(output_schemas, *input) {
663                check_col_kind(
664                    op_index,
665                    "src_col",
666                    *src_col as i32,
667                    schema,
668                    &[ColKind::Node],
669                    errors,
670                );
671            }
672        }
673        Op::CreateRel {
674            input,
675            src_col,
676            dst_col,
677            ..
678        } => {
679            if let Some(schema) = input_schema(output_schemas, *input) {
680                check_col_kind(
681                    op_index,
682                    "src_col",
683                    *src_col,
684                    schema,
685                    &[ColKind::Node],
686                    errors,
687                );
688                check_col_kind(
689                    op_index,
690                    "dst_col",
691                    *dst_col,
692                    schema,
693                    &[ColKind::Node],
694                    errors,
695                );
696            }
697        }
698        Op::Delete {
699            input, target_col, ..
700        }
701        | Op::SetProperty {
702            input, target_col, ..
703        }
704        | Op::RemoveProperty {
705            input, target_col, ..
706        } => {
707            if let Some(schema) = input_schema(output_schemas, *input) {
708                check_col_kind(
709                    op_index,
710                    "target_col",
711                    *target_col,
712                    schema,
713                    &[ColKind::Node, ColKind::Rel],
714                    errors,
715                );
716            }
717        }
718        _ => {}
719    }
720}
721
722fn op_output_schema(op: &Op, output_schemas: &[Option<Vec<ColDef>>]) -> Option<Vec<ColDef>> {
723    match op {
724        Op::ScanNodes { schema, .. }
725        | Op::ScanRels { schema, .. }
726        | Op::Expand { schema, .. }
727        | Op::OptionalExpand { schema, .. }
728        | Op::SemiExpand { schema, .. }
729        | Op::ExpandVarLen { schema, .. }
730        | Op::Project { schema, .. }
731        | Op::Aggregate { schema, .. }
732        | Op::Unwind { schema, .. }
733        | Op::PathConstruct { schema, .. }
734        | Op::Union { schema, .. }
735        | Op::CreateNode { schema, .. }
736        | Op::CreateRel { schema, .. }
737        | Op::Merge { schema, .. }
738        | Op::Delete { schema, .. }
739        | Op::SetProperty { schema, .. }
740        | Op::RemoveProperty { schema, .. }
741        | Op::VectorScan { schema, .. }
742        | Op::Rerank { schema, .. } => {
743            if schema.is_empty() {
744                None
745            } else {
746                Some(schema.clone())
747            }
748        }
749        Op::Filter { input, .. }
750        | Op::BlockMarker { input, .. }
751        | Op::Sort { input, .. }
752        | Op::Limit { input, .. }
753        | Op::Return { input } => output_schemas.get(*input as usize).and_then(|o| o.clone()),
754        Op::ConstRow => Some(vec![]),
755    }
756}
757
758fn op_output_width(op: &Op, output_widths: &[Option<u32>]) -> Option<u32> {
759    match op {
760        Op::ScanNodes { schema, .. }
761        | Op::ScanRels { schema, .. }
762        | Op::Expand { schema, .. }
763        | Op::OptionalExpand { schema, .. }
764        | Op::SemiExpand { schema, .. }
765        | Op::ExpandVarLen { schema, .. }
766        | Op::Project { schema, .. }
767        | Op::Aggregate { schema, .. }
768        | Op::Unwind { schema, .. }
769        | Op::PathConstruct { schema, .. }
770        | Op::Union { schema, .. }
771        | Op::CreateNode { schema, .. }
772        | Op::CreateRel { schema, .. }
773        | Op::Merge { schema, .. }
774        | Op::Delete { schema, .. }
775        | Op::SetProperty { schema, .. }
776        | Op::RemoveProperty { schema, .. }
777        | Op::VectorScan { schema, .. }
778        | Op::Rerank { schema, .. } => schema_width_opt(schema),
779        Op::Filter { input, .. }
780        | Op::BlockMarker { input, .. }
781        | Op::Sort { input, .. }
782        | Op::Limit { input, .. }
783        | Op::Return { input } => input_width(output_widths, *input),
784        Op::ConstRow => Some(0),
785    }
786}
787
788/// Extract all input references from an op as (field_name, index) pairs.
789fn op_input_refs(op: &Op) -> Vec<(&'static str, u32)> {
790    match op {
791        Op::ScanNodes { .. } | Op::ScanRels { .. } | Op::ConstRow => vec![],
792        Op::Expand { input, .. }
793        | Op::OptionalExpand { input, .. }
794        | Op::SemiExpand { input, .. }
795        | Op::ExpandVarLen { input, .. }
796        | Op::Filter { input, .. }
797        | Op::BlockMarker { input, .. }
798        | Op::Project { input, .. }
799        | Op::Aggregate { input, .. }
800        | Op::Sort { input, .. }
801        | Op::Limit { input, .. }
802        | Op::Unwind { input, .. }
803        | Op::PathConstruct { input, .. }
804        | Op::CreateNode { input, .. }
805        | Op::CreateRel { input, .. }
806        | Op::Merge { input, .. }
807        | Op::Delete { input, .. }
808        | Op::SetProperty { input, .. }
809        | Op::RemoveProperty { input, .. }
810        | Op::VectorScan { input, .. }
811        | Op::Rerank { input, .. }
812        | Op::Return { input } => vec![("input", *input)],
813        Op::Union { lhs, rhs, .. } => vec![("lhs", *lhs), ("rhs", *rhs)],
814    }
815}
816
817#[cfg(test)]
818mod tests {
819    use super::*;
820    use crate::{
821        current_plan_version,
822        types::{Expr, Version},
823    };
824
825    fn v() -> Version {
826        current_plan_version("test")
827    }
828
829    #[test]
830    fn valid_plan_passes() {
831        let plan = Plan {
832            version: v(),
833            ops: vec![
834                Op::ScanNodes {
835                    labels: vec![],
836                    schema: vec![],
837                    must_labels: vec![],
838                    forbidden_labels: vec![],
839                    est_rows: -1,
840                    selectivity: 1.0,
841                    graph_ref: None,
842                },
843                Op::Filter {
844                    input: 0,
845                    predicate: Expr::BoolLiteral(true),
846                },
847                Op::Return { input: 1 },
848            ],
849            root_op: 2,
850        };
851        assert!(validate_plan_structure(&plan).is_ok());
852    }
853
854    #[test]
855    fn empty_plan_rejected() {
856        let plan = Plan {
857            version: v(),
858            ops: vec![],
859            root_op: 0,
860        };
861        let errs = validate_plan_structure(&plan).unwrap_err();
862        assert!(errs
863            .iter()
864            .any(|e| matches!(e, PlanValidationError::EmptyPlan)));
865    }
866
867    #[test]
868    fn invalid_root_op_rejected() {
869        let plan = Plan {
870            version: v(),
871            ops: vec![Op::ScanNodes {
872                labels: vec![],
873                schema: vec![],
874                must_labels: vec![],
875                forbidden_labels: vec![],
876                est_rows: -1,
877                selectivity: 1.0,
878                graph_ref: None,
879            }],
880            root_op: 5,
881        };
882        let errs = validate_plan_structure(&plan).unwrap_err();
883        assert!(errs.iter().any(|e| matches!(
884            e,
885            PlanValidationError::InvalidRootOp {
886                root_op: 5,
887                num_ops: 1
888            }
889        )));
890    }
891
892    #[test]
893    fn forward_reference_rejected() {
894        let plan = Plan {
895            version: v(),
896            ops: vec![
897                Op::Filter {
898                    input: 1,
899                    predicate: Expr::BoolLiteral(true),
900                },
901                Op::ScanNodes {
902                    labels: vec![],
903                    schema: vec![],
904                    must_labels: vec![],
905                    forbidden_labels: vec![],
906                    est_rows: -1,
907                    selectivity: 1.0,
908                    graph_ref: None,
909                },
910                Op::Return { input: 0 },
911            ],
912            root_op: 2,
913        };
914        let errs = validate_plan_structure(&plan).unwrap_err();
915        assert!(errs.iter().any(|e| matches!(
916            e,
917            PlanValidationError::ForwardReference {
918                op_index: 0,
919                input: 1,
920                ..
921            }
922        )));
923    }
924
925    #[test]
926    fn self_reference_rejected() {
927        let plan = Plan {
928            version: v(),
929            ops: vec![
930                Op::ScanNodes {
931                    labels: vec![],
932                    schema: vec![],
933                    must_labels: vec![],
934                    forbidden_labels: vec![],
935                    est_rows: -1,
936                    selectivity: 1.0,
937                    graph_ref: None,
938                },
939                Op::Filter {
940                    input: 1,
941                    predicate: Expr::BoolLiteral(true),
942                },
943                Op::Return { input: 1 },
944            ],
945            root_op: 2,
946        };
947        let errs = validate_plan_structure(&plan).unwrap_err();
948        assert!(errs.iter().any(|e| matches!(
949            e,
950            PlanValidationError::ForwardReference {
951                op_index: 1,
952                input: 1,
953                ..
954            }
955        )));
956    }
957
958    #[test]
959    fn out_of_bounds_input_rejected() {
960        let plan = Plan {
961            version: v(),
962            ops: vec![
963                Op::ScanNodes {
964                    labels: vec![],
965                    schema: vec![],
966                    must_labels: vec![],
967                    forbidden_labels: vec![],
968                    est_rows: -1,
969                    selectivity: 1.0,
970                    graph_ref: None,
971                },
972                Op::Filter {
973                    input: 99,
974                    predicate: Expr::BoolLiteral(true),
975                },
976                Op::Return { input: 1 },
977            ],
978            root_op: 2,
979        };
980        let errs = validate_plan_structure(&plan).unwrap_err();
981        assert!(errs.iter().any(|e| matches!(
982            e,
983            PlanValidationError::InputOutOfBounds {
984                op_index: 1,
985                input: 99,
986                ..
987            }
988        )));
989    }
990
991    #[test]
992    fn union_validates_both_inputs() {
993        let plan = Plan {
994            version: v(),
995            ops: vec![
996                Op::ScanNodes {
997                    labels: vec![],
998                    schema: vec![],
999                    must_labels: vec![],
1000                    forbidden_labels: vec![],
1001                    est_rows: -1,
1002                    selectivity: 1.0,
1003                    graph_ref: None,
1004                },
1005                Op::Union {
1006                    lhs: 0,
1007                    rhs: 50,
1008                    all: true,
1009                    schema: vec![],
1010                },
1011                Op::Return { input: 1 },
1012            ],
1013            root_op: 2,
1014        };
1015        let errs = validate_plan_structure(&plan).unwrap_err();
1016        assert!(errs.iter().any(|e| matches!(
1017            e,
1018            PlanValidationError::InputOutOfBounds {
1019                op_index: 1,
1020                field: "rhs",
1021                input: 50,
1022                ..
1023            }
1024        )));
1025    }
1026
1027    #[test]
1028    fn rejects_invalid_graph_ref_format() {
1029        let plan = Plan {
1030            version: v(),
1031            ops: vec![
1032                Op::ScanNodes {
1033                    labels: vec![],
1034                    schema: vec![],
1035                    must_labels: vec![],
1036                    forbidden_labels: vec![],
1037                    est_rows: -1,
1038                    selectivity: 1.0,
1039                    graph_ref: Some("1bad".to_string()),
1040                },
1041                Op::Return { input: 0 },
1042            ],
1043            root_op: 1,
1044        };
1045
1046        let errs = validate_plan_structure(&plan).unwrap_err();
1047        assert!(errs.iter().any(|e| {
1048            matches!(
1049                e,
1050                PlanValidationError::InvalidGraphRef {
1051                    op_index: 0,
1052                    field: "graph_ref",
1053                    ..
1054                }
1055            )
1056        }));
1057    }
1058
1059    #[test]
1060    fn accepts_catalog_qualified_graph_ref() {
1061        // Dotted catalog-qualified names are valid graph_ref values.
1062        for name in &["catalog.main", "a.b.c", "cat.schema.social_graph"] {
1063            let plan = Plan {
1064                version: v(),
1065                ops: vec![
1066                    Op::ScanNodes {
1067                        labels: vec![],
1068                        schema: vec![],
1069                        must_labels: vec![],
1070                        forbidden_labels: vec![],
1071                        est_rows: -1,
1072                        selectivity: 1.0,
1073                        graph_ref: Some(name.to_string()),
1074                    },
1075                    Op::Return { input: 0 },
1076                ],
1077                root_op: 1,
1078            };
1079            assert!(
1080                validate_plan_structure(&plan).is_ok(),
1081                "expected valid graph_ref for: {name}"
1082            );
1083        }
1084    }
1085
1086    #[test]
1087    fn accepts_graph_param_variable_graph_ref() {
1088        // $ident-style graph parameter variables are valid graph_ref values.
1089        for name in &["$g", "$my_graph", "$db"] {
1090            let plan = Plan {
1091                version: v(),
1092                ops: vec![
1093                    Op::ScanNodes {
1094                        labels: vec![],
1095                        schema: vec![],
1096                        must_labels: vec![],
1097                        forbidden_labels: vec![],
1098                        est_rows: -1,
1099                        selectivity: 1.0,
1100                        graph_ref: Some(name.to_string()),
1101                    },
1102                    Op::Return { input: 0 },
1103                ],
1104                root_op: 1,
1105            };
1106            assert!(
1107                validate_plan_structure(&plan).is_ok(),
1108                "expected valid graph_ref for: {name}"
1109            );
1110        }
1111    }
1112
1113    #[test]
1114    fn rejects_spaced_dotted_graph_ref() {
1115        // "a . b" has spaces around the dot and must be rejected.
1116        let plan = Plan {
1117            version: v(),
1118            ops: vec![
1119                Op::ScanNodes {
1120                    labels: vec![],
1121                    schema: vec![],
1122                    must_labels: vec![],
1123                    forbidden_labels: vec![],
1124                    est_rows: -1,
1125                    selectivity: 1.0,
1126                    graph_ref: Some("catalog . main".to_string()),
1127                },
1128                Op::Return { input: 0 },
1129            ],
1130            root_op: 1,
1131        };
1132        assert!(validate_plan_structure(&plan).is_err());
1133    }
1134
1135    #[test]
1136    fn multi_graph_refs_pass_structural_validation() {
1137        // Multi-graph plans are structurally valid; capability enforcement is
1138        // the engine's responsibility, not the structural validator's.
1139        let plan = Plan {
1140            version: v(),
1141            ops: vec![
1142                Op::ScanNodes {
1143                    labels: vec![],
1144                    schema: vec![],
1145                    must_labels: vec![],
1146                    forbidden_labels: vec![],
1147                    est_rows: -1,
1148                    selectivity: 1.0,
1149                    graph_ref: Some("g1".to_string()),
1150                },
1151                Op::Expand {
1152                    input: 0,
1153                    src_col: 0,
1154                    types: vec!["KNOWS".to_string()],
1155                    dir: crate::types::ExpandDir::Out,
1156                    schema: vec![],
1157                    src_var: "n".to_string(),
1158                    rel_var: "r".to_string(),
1159                    dst_var: "m".to_string(),
1160                    legal_src_labels: vec![],
1161                    legal_dst_labels: vec![],
1162                    est_degree: -1.0,
1163                    graph_ref: Some("g2".to_string()),
1164                },
1165                Op::Return { input: 1 },
1166            ],
1167            root_op: 2,
1168        };
1169
1170        assert!(validate_plan_structure(&plan).is_ok());
1171    }
1172
1173    #[test]
1174    fn const_row_has_zero_output_width() {
1175        let plan = Plan {
1176            version: v(),
1177            ops: vec![Op::ConstRow, Op::Return { input: 0 }],
1178            root_op: 1,
1179        };
1180        assert!(validate_plan_structure(&plan).is_ok());
1181    }
1182
1183    #[test]
1184    fn const_row_followed_by_create_node_is_valid() {
1185        use crate::types::{ColDef, ColKind, LogicalType};
1186        let plan = Plan {
1187            version: v(),
1188            ops: vec![
1189                Op::ConstRow,
1190                Op::CreateNode {
1191                    input: 0,
1192                    labels: vec!["Person".to_string()],
1193                    props: Expr::NullLiteral,
1194                    schema: vec![ColDef {
1195                        name: "n".to_string(),
1196                        kind: ColKind::Node,
1197                        logical_type: LogicalType::NodeRef,
1198                    }],
1199                    out_var: "n".to_string(),
1200                },
1201                Op::Return { input: 1 },
1202            ],
1203            root_op: 2,
1204        };
1205        assert!(validate_plan_structure(&plan).is_ok());
1206    }
1207
1208    // ── Schema kind checks (Phase 12.4) ────────────────────────────────────
1209
1210    fn node_col(name: &str) -> ColDef {
1211        use crate::types::{ColKind, LogicalType};
1212        ColDef {
1213            name: name.to_string(),
1214            kind: ColKind::Node,
1215            logical_type: LogicalType::NodeRef,
1216        }
1217    }
1218
1219    fn int_col(name: &str) -> ColDef {
1220        use crate::types::{ColKind, LogicalType};
1221        ColDef {
1222            name: name.to_string(),
1223            kind: ColKind::Int64,
1224            logical_type: LogicalType::Int64,
1225        }
1226    }
1227
1228    fn rel_col(name: &str) -> ColDef {
1229        use crate::types::{ColKind, LogicalType};
1230        ColDef {
1231            name: name.to_string(),
1232            kind: ColKind::Rel,
1233            logical_type: LogicalType::RelRef,
1234        }
1235    }
1236
1237    #[test]
1238    fn expand_src_col_must_be_node_kind() {
1239        // Expand.src_col references an Int64 column — should be rejected.
1240        let plan = Plan {
1241            version: v(),
1242            ops: vec![
1243                Op::ScanNodes {
1244                    labels: vec![],
1245                    schema: vec![int_col("x")], // Int64, not Node
1246                    must_labels: vec![],
1247                    forbidden_labels: vec![],
1248                    est_rows: -1,
1249                    selectivity: 1.0,
1250                    graph_ref: None,
1251                },
1252                Op::Expand {
1253                    input: 0,
1254                    src_col: 0, // references col 0 which is Int64
1255                    types: vec!["KNOWS".to_string()],
1256                    dir: crate::types::ExpandDir::Out,
1257                    schema: vec![node_col("n"), rel_col("r"), node_col("m")],
1258                    src_var: "n".to_string(),
1259                    rel_var: "r".to_string(),
1260                    dst_var: "m".to_string(),
1261                    legal_src_labels: vec![],
1262                    legal_dst_labels: vec![],
1263                    est_degree: -1.0,
1264                    graph_ref: None,
1265                },
1266                Op::Return { input: 1 },
1267            ],
1268            root_op: 2,
1269        };
1270        let errs = validate_plan_structure(&plan).unwrap_err();
1271        assert!(
1272            errs.iter().any(|e| matches!(
1273                e,
1274                PlanValidationError::SchemaKindMismatch {
1275                    op_index: 1,
1276                    field: "src_col",
1277                    ..
1278                }
1279            )),
1280            "expected SchemaKindMismatch for src_col, got: {errs:?}"
1281        );
1282    }
1283
1284    #[test]
1285    fn expand_with_node_src_col_passes() {
1286        let plan = Plan {
1287            version: v(),
1288            ops: vec![
1289                Op::ScanNodes {
1290                    labels: vec![],
1291                    schema: vec![node_col("n")],
1292                    must_labels: vec![],
1293                    forbidden_labels: vec![],
1294                    est_rows: -1,
1295                    selectivity: 1.0,
1296                    graph_ref: None,
1297                },
1298                Op::Expand {
1299                    input: 0,
1300                    src_col: 0,
1301                    types: vec!["KNOWS".to_string()],
1302                    dir: crate::types::ExpandDir::Out,
1303                    schema: vec![node_col("n"), rel_col("r"), node_col("m")],
1304                    src_var: "n".to_string(),
1305                    rel_var: "r".to_string(),
1306                    dst_var: "m".to_string(),
1307                    legal_src_labels: vec![],
1308                    legal_dst_labels: vec![],
1309                    est_degree: -1.0,
1310                    graph_ref: None,
1311                },
1312                Op::Return { input: 1 },
1313            ],
1314            root_op: 2,
1315        };
1316        assert!(validate_plan_structure(&plan).is_ok());
1317    }
1318
1319    #[test]
1320    fn create_rel_src_col_must_be_node_kind() {
1321        let plan = Plan {
1322            version: v(),
1323            ops: vec![
1324                Op::ScanNodes {
1325                    labels: vec![],
1326                    schema: vec![node_col("n"), int_col("bad")],
1327                    must_labels: vec![],
1328                    forbidden_labels: vec![],
1329                    est_rows: -1,
1330                    selectivity: 1.0,
1331                    graph_ref: None,
1332                },
1333                Op::CreateRel {
1334                    input: 0,
1335                    src_col: 0,
1336                    dst_col: 1, // Int64, not Node — should fail
1337                    rel_type: "KNOWS".to_string(),
1338                    props: Expr::NullLiteral,
1339                    schema: vec![rel_col("r")],
1340                    out_var: "r".to_string(),
1341                },
1342                Op::Return { input: 1 },
1343            ],
1344            root_op: 2,
1345        };
1346        let errs = validate_plan_structure(&plan).unwrap_err();
1347        assert!(
1348            errs.iter().any(|e| matches!(
1349                e,
1350                PlanValidationError::SchemaKindMismatch {
1351                    op_index: 1,
1352                    field: "dst_col",
1353                    ..
1354                }
1355            )),
1356            "expected SchemaKindMismatch for dst_col, got: {errs:?}"
1357        );
1358    }
1359
1360    #[test]
1361    fn delete_target_col_must_be_node_or_rel() {
1362        let plan = Plan {
1363            version: v(),
1364            ops: vec![
1365                Op::ScanNodes {
1366                    labels: vec![],
1367                    schema: vec![int_col("x")], // Int64 — not valid for delete
1368                    must_labels: vec![],
1369                    forbidden_labels: vec![],
1370                    est_rows: -1,
1371                    selectivity: 1.0,
1372                    graph_ref: None,
1373                },
1374                Op::Delete {
1375                    input: 0,
1376                    target_col: 0,
1377                    detach: false,
1378                    schema: vec![int_col("x")],
1379                },
1380                Op::Return { input: 1 },
1381            ],
1382            root_op: 2,
1383        };
1384        let errs = validate_plan_structure(&plan).unwrap_err();
1385        assert!(
1386            errs.iter().any(|e| matches!(
1387                e,
1388                PlanValidationError::SchemaKindMismatch {
1389                    op_index: 1,
1390                    field: "target_col",
1391                    ..
1392                }
1393            )),
1394            "expected SchemaKindMismatch for target_col, got: {errs:?}"
1395        );
1396    }
1397
1398    #[test]
1399    fn set_property_with_rel_target_passes() {
1400        let plan = Plan {
1401            version: v(),
1402            ops: vec![
1403                Op::ScanNodes {
1404                    labels: vec![],
1405                    schema: vec![rel_col("r")],
1406                    must_labels: vec![],
1407                    forbidden_labels: vec![],
1408                    est_rows: -1,
1409                    selectivity: 1.0,
1410                    graph_ref: None,
1411                },
1412                Op::SetProperty {
1413                    input: 0,
1414                    target_col: 0,
1415                    key: "name".to_string(),
1416                    value_expr: Expr::StringLiteral("Alice".to_string()),
1417                    schema: vec![rel_col("r")],
1418                },
1419                Op::Return { input: 1 },
1420            ],
1421            root_op: 2,
1422        };
1423        assert!(validate_plan_structure(&plan).is_ok());
1424    }
1425
1426    #[test]
1427    fn rejects_vector_scan_with_zero_top_k() {
1428        let plan = Plan {
1429            version: v(),
1430            ops: vec![
1431                Op::ConstRow,
1432                Op::VectorScan {
1433                    input: 0,
1434                    collection: "embeddings".to_string(),
1435                    query_vector: Expr::ListLiteral { items: vec![] },
1436                    metric: crate::types::VectorMetric::Cosine,
1437                    top_k: 0,
1438                    approx_hint: true,
1439                    schema: vec![],
1440                },
1441                Op::Return { input: 1 },
1442            ],
1443            root_op: 2,
1444        };
1445
1446        let errs = validate_plan_structure(&plan).unwrap_err();
1447        assert!(errs.iter().any(|e| {
1448            matches!(
1449                e,
1450                PlanValidationError::NonPositiveValue {
1451                    op_index: 1,
1452                    field: "top_k",
1453                    value: 0
1454                }
1455            )
1456        }));
1457    }
1458
1459    #[test]
1460    fn rejects_rerank_with_zero_top_k() {
1461        let plan = Plan {
1462            version: v(),
1463            ops: vec![
1464                Op::ConstRow,
1465                Op::Rerank {
1466                    input: 0,
1467                    score_expr: Expr::NullLiteral,
1468                    top_k: 0,
1469                    schema: vec![],
1470                },
1471                Op::Return { input: 1 },
1472            ],
1473            root_op: 2,
1474        };
1475        let errs = validate_plan_structure(&plan).unwrap_err();
1476        assert!(errs.iter().any(|e| {
1477            matches!(
1478                e,
1479                PlanValidationError::NonPositiveValue {
1480                    op_index: 1,
1481                    field: "top_k",
1482                    value: 0
1483                }
1484            )
1485        }));
1486    }
1487}