Skip to main content

helios_persistence/sof/
compile_view.rs

1//! ViewDefinition JSON → [`PlanNode`] compiler.
2//!
3//! Walks the SoF `select` tree producing a plan tree rooted in a
4//! [`PlanNode::Scan`] over `resources`. Per-clause logic:
5//!
6//! - Plain `select.column[]` → column projections off the current focus.
7//! - `forEach`/`forEachOrNull` → [`PlanNode::LateralUnnest`] over the parent.
8//! - Nested `select[]` → contributes additional columns under the parent's
9//!   focus (or extends the row source if it has its own `forEach`).
10//! - `unionAll[]` → [`PlanNode::Union`] with sibling column[] merged into
11//!   each branch.
12//! - Top-level `where[].path` → [`PlanNode::Filter`] applied to the root scan.
13//!
14//! Stages 4–5 add chained-call collection threading, repeat:, and boundary
15//! functions.
16
17use helios_fhir::FhirVersion;
18use helios_sof::ConstantValue;
19use serde_json::Value;
20
21use crate::core::sof_runner::SofError;
22
23use super::compile_path::{CompileEnv, Constant, compile_fhirpath_expr};
24use super::ir::{Column, LitValue, PathStep, PlanNode, SqlExpr, SqlType};
25
26const ROOT_ALIAS: &str = "r";
27const FOREACH_ALIAS_PREFIX: &str = "fe";
28
29/// Build a plan tree for the given ViewDefinition JSON.
30///
31/// Returns the plan plus the resolved `ViewDefinition.constant[]` values in
32/// the order they were bound to SQL parameter slots. The runners append them
33/// after `tenant_id` / `resource_type`.
34///
35/// The `dialect` parameter is currently used only by the trailing-`[N]`
36/// forEach lowering ([`build_degenerate_chain_sql`]) which builds a SQL
37/// chain string at compile time. Other features lower through the
38/// dialect-aware emit path.
39pub fn build_plan(
40    view_json: &Value,
41    dialect: &dyn super::dialect::Dialect,
42    target: super::compiler::CompileTarget,
43    fhir_version: FhirVersion,
44) -> Result<(PlanNode, Vec<LitValue>), SofError> {
45    let resource_type = view_json
46        .get("resource")
47        .and_then(|v| v.as_str())
48        .filter(|s| !s.is_empty())
49        .ok_or_else(|| {
50            SofError::InvalidViewDefinition("ViewDefinition.resource is required".to_string())
51        })?
52        .to_string();
53
54    let selects = view_json
55        .get("select")
56        .and_then(|v| v.as_array())
57        .ok_or_else(|| {
58            SofError::InvalidViewDefinition(
59                "ViewDefinition.select must be a non-null array".to_string(),
60            )
61        })?;
62    if selects.is_empty() {
63        return Err(SofError::InvalidViewDefinition(
64            "ViewDefinition.select must have at least one clause".to_string(),
65        ));
66    }
67
68    let mut env = CompileEnv::new_for_resource(
69        format!("{ROOT_ALIAS}.data"),
70        resource_type.clone(),
71        fhir_version,
72    );
73    populate_constants(view_json, &mut env)?;
74
75    // Top-level where filters apply to the resource row, before any unnest.
76    let mut where_predicates: Vec<SqlExpr> = Vec::new();
77    if let Some(wheres) = view_json.get("where").and_then(|v| v.as_array()) {
78        for w in wheres {
79            if let Some(path) = w.get("path").and_then(|v| v.as_str()) {
80                // SoF v2 spec: where[].path must resolve to a boolean. A
81                // plain field-navigation expression with no operators or
82                // function calls is provably non-boolean — reject at
83                // compile time so views like `where: [{path: "name.family"}]`
84                // don't silently misbehave.
85                if where_path_is_provably_non_boolean(path) {
86                    return Err(SofError::InvalidViewDefinition(format!(
87                        "ViewDefinition.where[].path '{path}' must resolve to a \
88                         boolean (got a plain navigation expression)"
89                    )));
90                }
91                let pred = compile_fhirpath_expr(path, &mut env)?;
92                where_predicates.push(pred);
93            }
94        }
95    }
96
97    let scan = PlanNode::Scan {
98        alias: ROOT_ALIAS.to_string(),
99        resource_type: resource_type.clone(),
100    };
101    let mut root_plan = scan;
102    for pred in where_predicates {
103        root_plan = PlanNode::Filter {
104            parent: Box::new(root_plan),
105            predicate: pred,
106        };
107    }
108
109    let mut alias_seq = AliasSeq::new();
110    let plan = plan_clause_list(
111        selects,
112        &root_plan,
113        &format!("{ROOT_ALIAS}.data"),
114        &mut env,
115        &mut alias_seq,
116        dialect,
117        target,
118    )
119    .and_then(ensure_project)?;
120    Ok((plan, env.param_bindings))
121}
122
123/// Reads `ViewDefinition.constant[]` and populates `env.constants` with typed
124/// values. Each entry must have a `name` and exactly one `valueX` field per
125/// the SoF v2 spec. Delegates the field walk to
126/// [`helios_sof::parse_constant_from_json`] so the spec field list lives in
127/// one place; here we just lift the neutral [`ConstantValue`] into the
128/// compiler's [`LitValue`] (which keeps dates/times as text — FHIRPath
129/// `@`/`@T` prefixing only matters for the in-process evaluator).
130fn populate_constants(view_json: &Value, env: &mut CompileEnv) -> Result<(), SofError> {
131    let Some(constants) = view_json.get("constant").and_then(|v| v.as_array()) else {
132        return Ok(());
133    };
134    for c in constants {
135        let (name, value) = helios_sof::parse_constant_from_json(c).map_err(lift_sof_error)?;
136        env.constants.insert(
137            name,
138            Constant {
139                value: lit_value_from_constant(value),
140                bound_to: None,
141            },
142        );
143    }
144    Ok(())
145}
146
147/// Lowers a neutral [`ConstantValue`] into the in-DB compiler's [`LitValue`].
148/// All FHIR string-shaped primitives collapse to `Str`; the date/time/instant
149/// families keep their lexical form (no `@`-prefixing — SQL parameter binding
150/// takes plain ISO 8601 strings).
151fn lit_value_from_constant(value: ConstantValue) -> LitValue {
152    match value {
153        ConstantValue::String(s)
154        | ConstantValue::Code(s)
155        | ConstantValue::Identifier(s)
156        | ConstantValue::Base64Binary(s)
157        | ConstantValue::Markdown(s)
158        | ConstantValue::Date(s)
159        | ConstantValue::DateTime(s)
160        | ConstantValue::Time(s)
161        | ConstantValue::Instant(s) => LitValue::Str(s),
162        ConstantValue::Boolean(b) => LitValue::Bool(b),
163        ConstantValue::Integer(i)
164        | ConstantValue::PositiveInt(i)
165        | ConstantValue::UnsignedInt(i)
166        | ConstantValue::Integer64(i) => LitValue::Int(i),
167        ConstantValue::Decimal(s) => LitValue::Decimal(s),
168    }
169}
170
171/// Maps `helios_sof::SofError` (raised by the shared SoF spec parser) onto
172/// the persistence crate's local `SofError`. Only `InvalidViewDefinition`
173/// is reachable from the parser today; other variants pass through as the
174/// same flavour to keep the 422-mapping consistent.
175fn lift_sof_error(e: helios_sof::SofError) -> SofError {
176    match e {
177        helios_sof::SofError::InvalidViewDefinition(msg) => SofError::InvalidViewDefinition(msg),
178        other => SofError::InvalidViewDefinition(other.to_string()),
179    }
180}
181
182/// Walks a list of select clauses sharing a parent row source. Builds either
183/// a single `Project` (one row per parent row) or a `Union` of Projects (when
184/// any clause is `unionAll`).
185fn plan_clause_list(
186    clauses: &[Value],
187    parent_plan: &PlanNode,
188    parent_focus: &str,
189    env: &mut CompileEnv,
190    alias_seq: &mut AliasSeq,
191    dialect: &dyn super::dialect::Dialect,
192    target: super::compiler::CompileTarget,
193) -> Result<PlanNode, SofError> {
194    // Single-pass: collect sibling root columns + at most one `forEach` per
195    // level + handle a single `unionAll` clause. Multiple unionAll clauses at
196    // the same level are not exercised by the corpus.
197    let mut shared_columns: Vec<Column> = Vec::new();
198    let mut shared_unnests: Vec<UnnestStep> = Vec::new();
199    let mut shared_recurse: Option<RecurseInfo> = None;
200    let mut union_branches: Option<&Vec<Value>> = None;
201
202    for clause in clauses {
203        if let Some(branches) = clause.get("unionAll").and_then(|v| v.as_array()) {
204            if union_branches.is_some() {
205                return Err(SofError::Uncompilable {
206                    reason: "multiple unionAll clauses at the same level are not supported"
207                        .to_string(),
208                });
209            }
210            if branches.is_empty() {
211                return Err(SofError::InvalidViewDefinition(
212                    "unionAll branches list is empty".to_string(),
213                ));
214            }
215            union_branches = Some(branches);
216            // Sibling columns/forEach in this same clause are merged into
217            // every branch (handled below).
218            let parts = read_clause_columns_and_iter(
219                clause,
220                parent_focus,
221                env,
222                alias_seq,
223                dialect,
224                target,
225            )?;
226            shared_columns.extend(parts.columns);
227            shared_unnests.extend(parts.unnests);
228            continue;
229        }
230
231        let parts =
232            read_clause_columns_and_iter(clause, parent_focus, env, alias_seq, dialect, target)?;
233        if let Some(rec) = parts.recurse {
234            if shared_recurse.is_some() {
235                return Err(SofError::Uncompilable {
236                    reason: "multiple repeat clauses at the same level are not supported"
237                        .to_string(),
238                });
239            }
240            shared_recurse = Some(rec);
241        }
242        shared_columns.extend(parts.columns);
243        shared_unnests.extend(parts.unnests);
244    }
245
246    // No unionAll → single Project, possibly under a chain of LATERAL unnests
247    // or wrapping a recursive descent.
248    let Some(branches) = union_branches else {
249        if shared_columns.is_empty() {
250            return Err(SofError::InvalidViewDefinition(
251                "no columns found in select clauses".to_string(),
252            ));
253        }
254        let mut plan = parent_plan.clone();
255        if let Some(rec) = shared_recurse {
256            // Recurse first, then apply any nested forEach unnests on top so
257            // `repeat:[item]` with a nested `forEach: "answer"` joins each
258            // visited node against its answer array.
259            plan = PlanNode::Recurse {
260                parent: Box::new(plan),
261                seed: SqlExpr::Lit(LitValue::Null), // unused; emitter walks parent
262                step_paths: rec.step_paths,
263                out_alias: rec.out_alias,
264            };
265            plan = apply_unnests(plan, &shared_unnests);
266        } else {
267            plan = apply_unnests(plan, &shared_unnests);
268        }
269        return Ok(PlanNode::Project {
270            parent: Box::new(plan),
271            columns: shared_columns,
272        });
273    };
274
275    if shared_recurse.is_some() {
276        return Err(SofError::Uncompilable {
277            reason: "select.repeat combined with sibling unionAll is not yet supported".to_string(),
278        });
279    }
280
281    // Flatten nested `unionAll` clauses one level deep — a branch whose only
282    // content is another `unionAll` array expands to its inner branches.
283    let flat_branches = flatten_union_branches(branches);
284
285    // The branches must read against the focus produced by the shared
286    // unnests (e.g. when the unionAll lives inside a `forEach: "contact"`
287    // clause, each branch's paths resolve relative to the contact iteration
288    // alias, not the resource document).
289    let branch_focus = shared_unnests
290        .last()
291        .map(|u| format!("{}.value", u.out_alias))
292        .unwrap_or_else(|| parent_focus.to_string());
293
294    // unionAll → one Project per branch, sibling cols/unnests merged in,
295    // wrapped in a Union.
296    let mut branch_plans: Vec<PlanNode> = Vec::with_capacity(flat_branches.len());
297    for branch in &flat_branches {
298        let parts =
299            read_clause_columns_and_iter(branch, &branch_focus, env, alias_seq, dialect, target)?;
300        // A unionAll branch may itself carry a `repeat:` clause — wrap that
301        // branch's plan in a Recurse and let the per-branch Project read off
302        // the recursive CTE alias.
303        let mut branch_plan = if let Some(rec) = parts.recurse {
304            if !shared_unnests.is_empty() || !parts.unnests.is_empty() {
305                return Err(SofError::Uncompilable {
306                    reason: "select.repeat inside a unionAll branch combined with forEach is \
307                             not yet supported"
308                        .to_string(),
309                });
310            }
311            PlanNode::Recurse {
312                parent: Box::new(parent_plan.clone()),
313                seed: SqlExpr::Lit(LitValue::Null),
314                step_paths: rec.step_paths,
315                out_alias: rec.out_alias,
316            }
317        } else {
318            // Each branch projection: parent's `where`-filtered scan + sibling
319            // unnests + this branch's unnests; columns = sibling cols + branch cols.
320            let mut combined_unnests = shared_unnests.clone();
321            combined_unnests.extend(parts.unnests);
322            apply_unnests(parent_plan.clone(), &combined_unnests)
323        };
324        // Apply per-branch extra filter (e.g. EXISTS-from-chain emitted by
325        // trailing-`[N]` forEach lowering to drop resources whose flattened
326        // chain returns no rows).
327        if let Some(filter) = parts.extra_filter {
328            branch_plan = PlanNode::Filter {
329                parent: Box::new(branch_plan),
330                predicate: filter,
331            };
332        }
333
334        let mut combined_cols = shared_columns.clone();
335        combined_cols.extend(parts.columns);
336        if combined_cols.is_empty() {
337            return Err(SofError::InvalidViewDefinition(
338                "unionAll branch produced no output columns".to_string(),
339            ));
340        }
341        branch_plans.push(PlanNode::Project {
342            parent: Box::new(branch_plan),
343            columns: combined_cols,
344        });
345    }
346
347    Ok(PlanNode::Union(branch_plans))
348}
349
350/// Flattens nested `unionAll` clauses one level deep — a branch whose only
351/// content is another `unionAll` array expands to its inner branches. The
352/// SoF v2 spec treats nested unionAll as semantically equivalent to a single
353/// flat list, so the compiler can simplify before plan assembly.
354fn flatten_union_branches(branches: &[Value]) -> Vec<Value> {
355    let mut out: Vec<Value> = Vec::new();
356    for b in branches {
357        if let Some(inner) = b.get("unionAll").and_then(|v| v.as_array())
358            && b.as_object().map(|o| o.len() == 1).unwrap_or(false)
359        {
360            out.extend(flatten_union_branches(inner));
361        } else {
362            out.push(b.clone());
363        }
364    }
365    out
366}
367
368/// One LATERAL unnest step in the chain extending a parent plan.
369#[derive(Debug, Clone)]
370struct UnnestStep {
371    source: SqlExpr,
372    out_alias: String,
373    left_join: bool,
374    /// Optional filter applied in the JOIN ON clause — used by forEach paths
375    /// that contain a `where(crit)` (e.g. `forEach: "name.where(use=X)"`).
376    /// The predicate is pre-lowered against `<out_alias>.value`.
377    on_filter: Option<SqlExpr>,
378    /// When set, restricts the unnest to the Nth element (zero-based) of the
379    /// flattened collection. Used for forEach paths ending in `[N]` —
380    /// FHIRPath indexes the flattened result, not each array crossing.
381    flat_index: Option<i64>,
382}
383
384/// One `repeat:` recursive descent — produces a recursive-CTE row source
385/// rather than a chain of lateral unnests.
386#[derive(Debug, Clone)]
387struct RecurseInfo {
388    /// Step paths to walk on each iteration (`r.data` for the seed,
389    /// `<alias>.node` for subsequent levels).
390    step_paths: Vec<super::ir::JsonPath>,
391    /// Alias of the recursive CTE (also the column alias for `node`).
392    out_alias: String,
393}
394
395/// Output of [`read_clause_columns_and_iter`]: the columns this clause
396/// contributes plus any unnests / recurse it adds to the row source.
397#[derive(Debug)]
398struct ClauseParts {
399    columns: Vec<Column>,
400    unnests: Vec<UnnestStep>,
401    recurse: Option<RecurseInfo>,
402    /// Extra per-branch filter applied as `Filter(parent, predicate)`.
403    /// Set by trailing-`[N]` forEach lowering to drop resources whose
404    /// flattened chain returns fewer than `N+1` elements.
405    extra_filter: Option<SqlExpr>,
406}
407
408/// Reads a single (non-unionAll) clause: its `forEach[OrNull]`, `column[]`,
409/// and any nested `select[]` clauses. Nested clauses contribute columns at
410/// the same focus (or extend the row source if they themselves have a
411/// forEach).
412fn read_clause_columns_and_iter(
413    clause: &Value,
414    parent_focus: &str,
415    env: &mut CompileEnv,
416    alias_seq: &mut AliasSeq,
417    dialect: &dyn super::dialect::Dialect,
418    target: super::compiler::CompileTarget,
419) -> Result<ClauseParts, SofError> {
420    // `repeat:` is mutually exclusive with `forEach`/`forEachOrNull`.
421    if let Some(repeat) = clause.get("repeat").and_then(|v| v.as_array()) {
422        if repeat.is_empty() {
423            return Err(SofError::InvalidViewDefinition(
424                "ViewDefinition select.repeat must contain at least one path".to_string(),
425            ));
426        }
427        if clause.get("forEach").is_some() || clause.get("forEachOrNull").is_some() {
428            return Err(SofError::Uncompilable {
429                reason: "select.repeat combined with forEach is not yet supported".to_string(),
430            });
431        }
432        let mut step_paths: Vec<super::ir::JsonPath> = Vec::with_capacity(repeat.len());
433        for p in repeat {
434            let s = p.as_str().ok_or_else(|| {
435                SofError::InvalidViewDefinition("select.repeat entries must be strings".to_string())
436            })?;
437            let prev_root = env.root_alias.clone();
438            env.root_alias = parent_focus.to_string();
439            let expr = compile_fhirpath_expr(s, env)?;
440            env.root_alias = prev_root;
441            match expr {
442                SqlExpr::JsonPath { path, .. } => step_paths.push(path),
443                _ => {
444                    return Err(SofError::Uncompilable {
445                        reason: format!("repeat path '{s}' must be a simple JSON path"),
446                    });
447                }
448            }
449        }
450        let alias = alias_seq.next_recurse();
451        let focus = format!("{alias}.node");
452        let mut columns = read_columns(clause, &focus, env)?;
453        // Nested `select[]` under `repeat:` may add columns at the recursive
454        // node focus AND/OR extend the row source via a forEach (e.g.
455        // `repeat:[item]` with a nested `forEach: "answer"` projects answer
456        // rows). Each nested forEach's unnests get hoisted onto the
457        // post-recurse plan; nested repeats are rejected.
458        let mut nested_unnests: Vec<UnnestStep> = Vec::new();
459        if let Some(nested) = clause.get("select").and_then(|v| v.as_array()) {
460            for sub in nested {
461                let sub_parts =
462                    read_clause_columns_and_iter(sub, &focus, env, alias_seq, dialect, target)?;
463                if sub_parts.recurse.is_some() {
464                    return Err(SofError::Uncompilable {
465                        reason: "select.repeat with nested repeat is not yet supported".to_string(),
466                    });
467                }
468                nested_unnests.extend(sub_parts.unnests);
469                columns.extend(sub_parts.columns);
470            }
471        }
472        return Ok(ClauseParts {
473            columns,
474            unnests: nested_unnests,
475            recurse: Some(RecurseInfo {
476                step_paths,
477                out_alias: alias,
478            }),
479            extra_filter: None,
480        });
481    }
482
483    let for_each_expr = clause
484        .get("forEach")
485        .and_then(|v| v.as_str())
486        .map(String::from);
487    let for_each_or_null_expr = clause
488        .get("forEachOrNull")
489        .and_then(|v| v.as_str())
490        .map(String::from);
491
492    let iter_path_src = for_each_expr.or(for_each_or_null_expr.clone());
493    let is_left_join = for_each_or_null_expr.is_some();
494
495    let (mut unnests, focus): (Vec<UnnestStep>, String) = if let Some(src) = iter_path_src {
496        // Detect a trailing `where(crit)` on the forEach path
497        // (`forEach: "name.where(use = X)"`). The criterion is lifted into
498        // the JOIN ON clause of the last lateral unnest so the iteration
499        // skips non-matching elements (and `forEachOrNull` keeps left-join
500        // semantics — preserving outer rows when no element matches).
501        let (path_src, where_crit_src): (String, Option<String>) =
502            split_trailing_where(&src).unwrap_or((src.clone(), None));
503
504        let prev_root = env.root_alias.clone();
505        env.root_alias = parent_focus.to_string();
506        let path_expr = compile_fhirpath_expr(&path_src, env)?;
507        env.root_alias = prev_root;
508        let path = match path_expr {
509            SqlExpr::JsonPath { path, .. } => path,
510            _ => {
511                return Err(SofError::Uncompilable {
512                    reason: format!("forEach path '{src}' must be a simple JSON path"),
513                });
514            }
515        };
516        // FHIRPath `[N]` indexes the flattened collection result, not each
517        // individual array crossing. SQLite forbids correlated subqueries in
518        // FROM, so trailing-Index forEach paths short-circuit into a
519        // *degenerate* iteration: no unnest in the FROM, each column wrapped
520        // in a correlated `ScalarFromChain` subquery in the SELECT.
521        let trailing_index = match path.0.last() {
522            Some(super::ir::PathStep::Index(n)) if path.0.len() > 1 => Some(*n),
523            _ => None,
524        };
525        // SQL targets lower trailing-`[N]` forEach into a correlated
526        // `ScalarFromChain` subquery (SQLite forbids correlated FROM
527        // subqueries). Targets without that constraint — MongoDB — instead
528        // fall through to the normal unnest path below, which tags the last
529        // unnest with `flat_index` for the emitter to lower to `$arrayElemAt`.
530        if let Some(idx) = trailing_index
531            && target.supports_correlated_from_subqueries()
532        {
533            let trimmed_path = super::ir::JsonPath(path.0[..path.0.len() - 1].to_vec());
534            let segments = split_path_into_segments(&trimmed_path);
535            let (chain_sql, deepest_alias) =
536                build_degenerate_chain_sql(&segments, parent_focus, alias_seq, dialect);
537            let column_focus = format!("{deepest_alias}.value");
538            let raw_columns = read_columns(clause, &column_focus, env)?;
539            // Wrap every column in a correlated scalar subquery. The
540            // outer SELECT sees one row per resource; the column projects
541            // the [N]-th element of the flattened chain (or NULL).
542            let columns: Vec<Column> = raw_columns
543                .into_iter()
544                .map(|c| Column {
545                    name: c.name,
546                    expr: SqlExpr::ScalarFromChain {
547                        chain_sql: chain_sql.clone(),
548                        projection: Box::new(c.expr),
549                        offset: idx,
550                    },
551                    collection: c.collection,
552                    ty: c.ty,
553                })
554                .collect();
555            // For `forEach` (not `forEachOrNull`), an empty chain means
556            // the resource produces NO row. Surface that as a per-branch
557            // EXISTS filter — wraps the branch's plan with `Filter(EXISTS
558            // (SELECT 1 FROM <chain> LIMIT 1 OFFSET <idx>))`.
559            let extra_filter = if is_left_join {
560                None
561            } else {
562                Some(SqlExpr::ScalarFromChain {
563                    chain_sql: chain_sql.clone(),
564                    projection: Box::new(SqlExpr::Lit(LitValue::Int(1))),
565                    offset: idx,
566                })
567            };
568            return Ok(ClauseParts {
569                columns,
570                unnests: Vec::new(),
571                recurse: None,
572                extra_filter,
573            });
574        }
575        // FHIRPath flattens through array boundaries automatically — emit
576        // one lateral unnest per `Field` step so `forEach: "contact.telecom"`
577        // produces one row per inner element. `Index` steps stay attached to
578        // the prior segment as plain navigation. Only the LAST `forEach`
579        // step uses LEFT JOIN for `forEachOrNull` so missing intermediate
580        // levels still drop the row (matching the FHIRPath empty-collection
581        // semantics).
582        let mut unnests: Vec<UnnestStep> = Vec::new();
583        let mut focus = parent_focus.to_string();
584        // When a trailing `[N]` is present (the non-SQL fall-through, e.g.
585        // MongoDB), drop it from the unnest segments and apply it as
586        // `flat_index` below — otherwise the segment navigation would index
587        // `[N]` and `flat_index` would index it a second time.
588        let unnest_path = if trailing_index.is_some() {
589            super::ir::JsonPath(path.0[..path.0.len() - 1].to_vec())
590        } else {
591            path.clone()
592        };
593        let segments = split_path_into_segments(&unnest_path);
594        let last_idx = segments.len().saturating_sub(1);
595        for (i, seg_path) in segments.into_iter().enumerate() {
596            let alias = alias_seq.next();
597            let source = SqlExpr::JsonPath {
598                root: focus.clone(),
599                path: seg_path,
600            };
601            // Compile the trailing `where(crit)` filter against the LAST
602            // unnest's iteration alias, so `name.where(use=X)` filters the
603            // expanded `name` rows.
604            let on_filter = if i == last_idx {
605                if let Some(ref crit_src) = where_crit_src {
606                    let prev_root = env.root_alias.clone();
607                    env.root_alias = format!("{alias}.value");
608                    let pred = compile_fhirpath_expr(crit_src, env);
609                    env.root_alias = prev_root;
610                    Some(pred?)
611                } else {
612                    None
613                }
614            } else {
615                None
616            };
617            unnests.push(UnnestStep {
618                source,
619                out_alias: alias.clone(),
620                left_join: is_left_join && i == last_idx,
621                on_filter,
622                flat_index: None,
623            });
624            focus = format!("{alias}.value");
625        }
626        // Apply trailing `[N]` semantics by tagging the LAST unnest with a
627        // limit/offset; the emitter wraps that unnest in a `LIMIT 1 OFFSET N`
628        // subquery so only the Nth element of the flattened collection is
629        // iterated.
630        if let Some(n) = trailing_index
631            && let Some(last) = unnests.last_mut()
632        {
633            last.flat_index = Some(n);
634        }
635        (unnests, focus)
636    } else {
637        (Vec::new(), parent_focus.to_string())
638    };
639
640    let mut columns = read_columns(clause, &focus, env)?;
641
642    // Nested select clauses: each contributes additional columns under the
643    // current focus. If a nested clause has its own forEach we extend the
644    // unnest chain; deeper unionAll inside nested select[] is not supported
645    // until a real-world conformance case demands it (corpus doesn't).
646    if let Some(nested) = clause.get("select").and_then(|v| v.as_array()) {
647        for sub in nested {
648            if sub.get("unionAll").is_some() {
649                return Err(SofError::Uncompilable {
650                    reason: "unionAll nested inside another select is not supported".to_string(),
651                });
652            }
653            let sub_parts =
654                read_clause_columns_and_iter(sub, &focus, env, alias_seq, dialect, target)?;
655            if sub_parts.recurse.is_some() {
656                return Err(SofError::Uncompilable {
657                    reason: "select.repeat nested inside another select is not yet supported"
658                        .to_string(),
659                });
660            }
661            unnests.extend(sub_parts.unnests);
662            columns.extend(sub_parts.columns);
663        }
664    }
665
666    Ok(ClauseParts {
667        columns,
668        unnests,
669        recurse: None,
670        extra_filter: None,
671    })
672}
673
674/// Reads the `column[]` array for a clause, lowering each path under `focus`.
675fn read_columns(
676    clause: &Value,
677    focus: &str,
678    env: &mut CompileEnv,
679) -> Result<Vec<Column>, SofError> {
680    let columns = match clause.get("column").and_then(|v| v.as_array()) {
681        Some(cols) if !cols.is_empty() => cols,
682        _ => return Ok(Vec::new()),
683    };
684
685    let prev_root = env.root_alias.clone();
686    env.root_alias = focus.to_string();
687
688    let mut out = Vec::with_capacity(columns.len());
689    for col in columns {
690        let path = col.get("path").and_then(|v| v.as_str()).ok_or_else(|| {
691            SofError::InvalidViewDefinition("column.path is required".to_string())
692        })?;
693        let name = col.get("name").and_then(|v| v.as_str()).ok_or_else(|| {
694            SofError::InvalidViewDefinition("column.name is required".to_string())
695        })?;
696        let collection_opt = col.get("collection").and_then(|v| v.as_bool());
697        let collection = collection_opt.unwrap_or(false);
698        // SoF v2 spec: when `collection: false` is EXPLICITLY declared, the
699        // path MUST yield at most one value. Without FHIR schema we can't
700        // verify cardinality precisely, but a multi-Field path through
701        // commonly-multi-valued FHIR root fields is a strong signal — reject
702        // those at compile time so the validator/conformance test passes.
703        if collection_opt == Some(false)
704            && path_likely_multi_valued(path, &env.resource_type, env.fhir_version)
705        {
706            return Err(SofError::InvalidViewDefinition(format!(
707                "column '{}' declares `collection: false` but path '{}' may yield \
708                 multiple values; declare `collection: true` or pick a single element",
709                col.get("name").and_then(|v| v.as_str()).unwrap_or(""),
710                path
711            )));
712        }
713
714        // Make the column's declared type visible to function-call lowering
715        // (currently used by `lowBoundary()` / `highBoundary()` to pick
716        // decimal vs. date/dateTime/time semantics).
717        let column_type = col.get("type").and_then(|v| v.as_str()).map(String::from);
718        let prev_type_hint = env.column_type_hint.take();
719        env.column_type_hint = column_type.clone();
720        let expr_result = compile_fhirpath_expr(path, env);
721        env.column_type_hint = prev_type_hint;
722        let expr = expr_result?;
723
724        let ty = column_type_from_hint(column_type.as_deref());
725        // For `collection: true` columns, swap the scalar projection for a
726        // [`SqlExpr::CollectionAgg`] over the same path. Only paths that
727        // lower to a plain `JsonPath` qualify — anything more complex
728        // (where(), join(), etc.) keeps its scalar form.
729        let final_expr = if collection {
730            match expr {
731                SqlExpr::JsonPath { root, path } => SqlExpr::CollectionAgg { root, path },
732                other => other,
733            }
734        } else {
735            expr
736        };
737        out.push(Column {
738            name: name.to_string(),
739            expr: final_expr,
740            collection: false, // emit-time array projection is in the SqlExpr
741            ty,
742        });
743    }
744    env.root_alias = prev_root;
745    Ok(out)
746}
747
748/// Heuristic: returns true when the FHIRPath source `path` is plain field
749/// navigation with no operators, function calls, or boolean-yielding
750/// constructs — therefore guaranteed not to resolve to a boolean. Used by
751/// the top-level `where[]` validator to reject views whose where expressions
752/// can't possibly yield true/false.
753fn where_path_is_provably_non_boolean(path: &str) -> bool {
754    let trimmed = path.trim();
755    if trimmed.is_empty() {
756        return false;
757    }
758    // A bare boolean field (`active`, `deceased`) is fine — we coerce at
759    // the WHERE boundary. Reject only multi-segment paths with no operators
760    // / function calls / boolean keywords.
761    let has_operator = trimmed.contains('=')
762        || trimmed.contains('!')
763        || trimmed.contains('<')
764        || trimmed.contains('>');
765    let has_call = trimmed.contains('(');
766    let has_bool_kw = [" and ", " or ", " not ", " in ", " contains "]
767        .iter()
768        .any(|k| trimmed.contains(k));
769    !has_operator && !has_call && !has_bool_kw && trimmed.contains('.')
770}
771
772/// Returns true when the FHIRPath source `path` navigates *through* a
773/// collection-cardinality FHIR element. Used by the strict `collection: false`
774/// check to reject views the runtime would mishandle.
775///
776/// Uses the per-version `get_field_type` lookup tables generated from FHIR
777/// StructureDefinitions (see `helios_fhir::{r4,r4b,r5,r6}::FIELD_TYPES`). The
778/// walk only handles plain dot navigation — any segment containing `(`, `[`,
779/// or whitespace is treated as opaque and stops the walk (returning the
780/// accumulated result so far). This stays conservative: function calls like
781/// `.first()` or `.where(...)` may change cardinality in ways the lookup
782/// can't model, so we don't speculate past them.
783fn path_likely_multi_valued(path: &str, resource_type: &str, fhir_version: FhirVersion) -> bool {
784    let trimmed = path.trim();
785    if trimmed.is_empty() || resource_type.is_empty() {
786        return false;
787    }
788    let mut parent = resource_type.to_string();
789    let mut segments = trimmed.split('.').peekable();
790    while let Some(seg) = segments.next() {
791        // Opaque segment (function call, indexer, anything non-trivial) —
792        // bail rather than guess.
793        if seg.is_empty() || seg.chars().any(|c| !c.is_ascii_alphanumeric()) {
794            return false;
795        }
796        let Some((field_type, is_collection)) =
797            super::lookup_field_type(fhir_version, &parent, seg)
798        else {
799            return false;
800        };
801        // We only fail the column when the collection appears *before* the
802        // final segment — `path = "name"` (which yields the full list) is
803        // accepted because the column projection wraps it in a JSON array.
804        if is_collection && segments.peek().is_some() {
805            return true;
806        }
807        parent = field_type.to_string();
808    }
809    false
810}
811
812/// Splits a forEach path source like `"name.where(use = X)"` into the base
813/// path (`"name"`) and the criterion source (`"use = X"`). Returns `None`
814/// when the source doesn't end in a `where(...)` call so callers fall back
815/// to plain path lowering. Detection is purely textual to avoid round-trips
816/// through the FHIRPath AST in the common case.
817fn split_trailing_where(src: &str) -> Option<(String, Option<String>)> {
818    let trimmed = src.trim();
819    let suffix = ".where(";
820    let pos = trimmed.rfind(suffix)?;
821    if !trimmed.ends_with(')') {
822        return None;
823    }
824    let base = trimmed[..pos].trim().to_string();
825    let crit = trimmed[pos + suffix.len()..trimmed.len() - 1]
826        .trim()
827        .to_string();
828    Some((base, Some(crit)))
829}
830
831/// Maps a `column.type` string (per the SoF v2 spec) onto the in-DB compiler's
832/// [`SqlType`]. Unknown / absent types fall back to text — the runner's row
833/// mapper auto-parses numeric-looking text as JSON numbers, which works for
834/// most cases without explicit typing.
835fn column_type_from_hint(hint: Option<&str>) -> SqlType {
836    match hint {
837        Some("boolean") => SqlType::Boolean,
838        Some("integer") | Some("positiveInt") | Some("unsignedInt") => SqlType::Integer,
839        Some("decimal") => SqlType::Decimal,
840        _ => SqlType::Text,
841    }
842}
843
844/// Builds an inline FROM-clause string for a flattened forEach chain — one
845/// unnest per Field segment, comma-joined. Used by the trailing-`[N]`
846/// degenerate-forEach lowering, which can't put correlated subqueries in
847/// the FROM on SQLite (SQLite restriction; PG supports it via LATERAL,
848/// but we use the same SELECT-side scalar-subquery shape on both for
849/// uniformity).
850///
851/// Returns the chain SQL plus the alias of the innermost iteration row so
852/// callers can root column projections on `<deepest>.value`. Each segment's
853/// unnest source is wrapped in a dialect-appropriate type guard so
854/// non-array intermediates (FHIR singletons like `Patient.contact.name`)
855/// produce one row instead of erroring.
856fn build_degenerate_chain_sql(
857    segments: &[super::ir::JsonPath],
858    parent_focus: &str,
859    alias_seq: &mut AliasSeq,
860    dialect: &dyn super::dialect::Dialect,
861) -> (String, String) {
862    use super::ir::PathStep;
863    let mut from_parts: Vec<String> = Vec::new();
864    let mut prev = parent_focus.to_string();
865    let mut last_alias = String::new();
866    let is_sqlite = dialect.lateral_keyword().is_empty();
867    for seg in segments {
868        let alias = alias_seq.next();
869        let segs_owned: Vec<String> = seg
870            .0
871            .iter()
872            .filter_map(|s| match s {
873                PathStep::Field(n) => Some(n.clone()),
874                PathStep::Index(n) => Some(n.to_string()),
875                _ => None,
876            })
877            .collect();
878        let segs: Vec<&str> = segs_owned.iter().map(String::as_str).collect();
879        let unnest_sql = if is_sqlite {
880            // SQLite — single-arg `json_each` with a JSON-text source +
881            // path. Numeric segments use `[N]`, others use `.field`.
882            let mut path_str = String::from("$");
883            for s in &segs {
884                if s.chars().all(|c| c.is_ascii_digit()) {
885                    path_str.push('[');
886                    path_str.push_str(s);
887                    path_str.push(']');
888                } else {
889                    path_str.push('.');
890                    path_str.push_str(s);
891                }
892            }
893            if prev == "r.data" && !path_str.contains('[') {
894                format!("json_each({prev}, '{path_str}')")
895            } else {
896                let extracted = format!("json_extract({prev}, '{path_str}')");
897                let type_check = format!("json_type({prev}, '{path_str}')");
898                format!(
899                    "json_each(CASE WHEN {type_check} = 'array' THEN {extracted} \
900                     WHEN {type_check} IN ('object', 'array') THEN json_array(json({extracted})) \
901                     WHEN {type_check} IS NOT NULL THEN json_array({extracted}) \
902                     ELSE '[]' END)"
903                )
904            }
905        } else {
906            // PostgreSQL — `jsonb_array_elements` over a `jsonb_typeof`
907            // type-guard so object intermediates (FHIR singletons) get
908            // wrapped in a single-element array. Numeric segments are
909            // path-array integers; field segments are path-array strings.
910            //
911            // `prev` may be either a jsonb expression (e.g. `r.data` or
912            // `<alias>.value` from jsonb_array_elements) or a text-typed
913            // correlated SELECT (when feeding from a prior ScalarFromChain
914            // whose projection used the `->>` text operator). Cast to
915            // jsonb so navigation works in both cases — `(jsonb)::jsonb`
916            // is a no-op, `(text)::jsonb` parses the JSON text.
917            let prev_jsonb = format!("({prev})::jsonb");
918            let nav = if segs.len() == 1 {
919                format!("{prev_jsonb}->'{}'", segs[0])
920            } else {
921                format!("{prev_jsonb}#>'{{{}}}'", segs.join(","))
922            };
923            format!(
924                "jsonb_array_elements(CASE WHEN jsonb_typeof({nav}) = 'array' THEN {nav} \
925                 WHEN jsonb_typeof({nav}) IS NOT NULL THEN jsonb_build_array({nav}) \
926                 ELSE '[]'::jsonb END)"
927            )
928        };
929        let from_part = if is_sqlite {
930            format!("{unnest_sql} {alias}")
931        } else {
932            // PG — give the table-function alias `<alias>(value)` so callers
933            // can reference `<alias>.value` uniformly.
934            format!("{unnest_sql} AS {alias}(value)")
935        };
936        from_parts.push(from_part);
937        last_alias = alias.clone();
938        prev = format!("{alias}.value");
939    }
940    (from_parts.join(", "), last_alias)
941}
942
943/// Splits a FHIRPath JSON path into one [`JsonPath`] per `Field` step.
944///
945/// `Index` steps stay grouped with the immediately-preceding `Field` so that
946/// `name[0].use` still drives a single navigation step into the first name
947/// before unnesting `use`. `OfType` / `TypeFilter` follow the same grouping.
948fn split_path_into_segments(path: &super::ir::JsonPath) -> Vec<super::ir::JsonPath> {
949    let mut segments: Vec<super::ir::JsonPath> = Vec::new();
950    let mut current: Vec<PathStep> = Vec::new();
951    for step in &path.0 {
952        match step {
953            PathStep::Field(_) => {
954                if !current.is_empty() {
955                    segments.push(super::ir::JsonPath(std::mem::take(&mut current)));
956                }
957                current.push(step.clone());
958            }
959            _ => current.push(step.clone()),
960        }
961    }
962    if !current.is_empty() {
963        segments.push(super::ir::JsonPath(current));
964    }
965    segments
966}
967
968/// Wraps `parent` in a chain of LateralUnnest nodes — outer-most last so the
969/// emitter walks from Scan upward and orders the JOINs correctly.
970fn apply_unnests(parent: PlanNode, unnests: &[UnnestStep]) -> PlanNode {
971    let mut p = parent;
972    for u in unnests {
973        p = PlanNode::LateralUnnest {
974            parent: Box::new(p),
975            source: u.source.clone(),
976            out_alias: u.out_alias.clone(),
977            left_join: u.left_join,
978            on_filter: u.on_filter.clone(),
979            flat_index: u.flat_index,
980        };
981    }
982    p
983}
984
985/// Final-step sanity check — `plan_clause_list` always returns either a
986/// `Project` or a `Union` of `Project`s; nothing else should reach the
987/// emitter at the top level.
988fn ensure_project(plan: PlanNode) -> Result<PlanNode, SofError> {
989    match &plan {
990        PlanNode::Project { .. } | PlanNode::Union(_) => Ok(plan),
991        other => Err(SofError::InvalidViewDefinition(format!(
992            "plan_clause_list returned an unexpected top node: {other:?}"
993        ))),
994    }
995}
996
997/// Sequentially-numbered alias generator for lateral unnests (`fe1`, `fe2`, …).
998/// Keeps generated SQL deterministic and avoids alias collisions when sibling
999/// or nested clauses each introduce their own forEach.
1000#[derive(Debug, Default)]
1001struct AliasSeq {
1002    next: usize,
1003}
1004
1005impl AliasSeq {
1006    fn new() -> Self {
1007        Self { next: 0 }
1008    }
1009    fn next(&mut self) -> String {
1010        self.next += 1;
1011        // The first unnest gets the legacy `fe` alias so existing test
1012        // assertions (which look for `fe.value`/`AS fe(value)`) keep matching.
1013        if self.next == 1 {
1014            FOREACH_ALIAS_PREFIX.to_string()
1015        } else {
1016            format!("{FOREACH_ALIAS_PREFIX}{}", self.next)
1017        }
1018    }
1019    fn next_recurse(&mut self) -> String {
1020        self.next += 1;
1021        format!("rec_{}", self.next - 1)
1022    }
1023}
1024
1025// PathStep is consumed when read_clause receives a JsonPath from
1026// compile_fhirpath_expr — keep the import referenced for clarity.
1027const _: Option<PathStep> = None;