Skip to main content

helios_persistence/sof/
emit.rs

1//! Lowers IR ([`PlanNode`]/[`SqlExpr`]) to a concrete SQL string for a given
2//! [`Dialect`].
3//!
4//! The emitter expects each plan tree to have a [`PlanNode::Project`] at the
5//! top (directly, or under a [`PlanNode::Union`]). Beneath the project lives a
6//! chain of [`PlanNode::Filter`] and [`PlanNode::LateralUnnest`] nodes, rooted
7//! in a [`PlanNode::Scan`]. The emitter walks that chain to assemble FROM /
8//! JOIN / WHERE / SELECT in dialect-appropriate syntax, then concatenates them.
9//!
10//! Stages 2–5 progressively add IR-variant coverage. Anything the emitter
11//! doesn't yet understand returns [`SofError::Uncompilable`].
12
13use crate::core::sof_runner::SofError;
14
15use super::dialect::Dialect;
16use super::ir::{
17    BinOp, BoundaryKind, BoundarySide, JsonPath, JsonType, LitValue, PathStep, PlanNode, SqlExpr,
18    SqlType, UnaryOp,
19};
20
21/// Compiled output for a single ViewDefinition.
22#[derive(Debug, Clone)]
23pub struct EmittedSql {
24    /// Parameterised SQL — a single `SELECT` (with CTEs allowed).
25    pub sql: String,
26    /// Output column names in projection order. Drives `row_to_json` in the
27    /// runners.
28    pub columns: Vec<String>,
29    /// Index of the next free bound parameter (`$N` / `?N`). The runners use
30    /// this to chain runtime filters (`since`, `patient`, `group`).
31    pub next_param_index: usize,
32}
33
34/// Lowers a plan tree to SQL for the given dialect.
35///
36/// # Errors
37///
38/// Returns [`SofError::InvalidViewDefinition`] for structurally invalid plans
39/// and [`SofError::Uncompilable`] for IR shapes outside the implemented subset
40/// at this stage.
41pub fn emit_plan(plan: &PlanNode, dialect: &dyn Dialect) -> Result<EmittedSql, SofError> {
42    match plan {
43        PlanNode::Union(branches) => emit_union(branches, dialect),
44        PlanNode::Project { parent, .. } if contains_recurse(parent) => {
45            emit_recurse_select(plan, dialect)
46        }
47        _ => emit_select(plan, dialect, /* with_tenant_predicate = */ true),
48    }
49}
50
51/// Walks a plan node downward to detect whether it's rooted in a `Recurse`
52/// (possibly wrapped in `LateralUnnest` / `Filter` layers). Used by
53/// [`emit_plan`] to dispatch to the recursive-CTE emitter.
54fn contains_recurse(node: &PlanNode) -> bool {
55    match node {
56        PlanNode::Recurse { .. } => true,
57        PlanNode::LateralUnnest { parent, .. } | PlanNode::Filter { parent, .. } => {
58            contains_recurse(parent)
59        }
60        _ => false,
61    }
62}
63
64// ============================================================================
65// Top-level SELECT assembly
66// ============================================================================
67
68/// Emit a `SELECT … FROM … WHERE … ORDER BY` for a non-Union plan.
69fn emit_select(
70    plan: &PlanNode,
71    dialect: &dyn Dialect,
72    with_tenant_predicate: bool,
73) -> Result<EmittedSql, SofError> {
74    // Tear the tree apart from the top down: must be Project at the root.
75    let (project_cols, body) = match plan {
76        PlanNode::Project { parent, columns } => (columns.as_slice(), parent.as_ref()),
77        _ => {
78            return Err(SofError::InvalidViewDefinition(
79                "plan tree must have a Project node at the top".to_string(),
80            ));
81        }
82    };
83
84    // Walk down through Filter / LateralUnnest / Scan, collecting pieces.
85    let mut frame = Frame::new();
86    walk_body(body, dialect, &mut frame)?;
87
88    let scan = frame
89        .scan
90        .as_ref()
91        .ok_or_else(|| SofError::InvalidViewDefinition("plan has no Scan node".to_string()))?;
92
93    // Build SELECT clause from the project columns.
94    let mut select_parts: Vec<String> = Vec::with_capacity(project_cols.len());
95    let mut columns: Vec<String> = Vec::with_capacity(project_cols.len());
96    for col in project_cols {
97        if col.collection {
98            return Err(SofError::Uncompilable {
99                reason: "column.collection=true is not yet supported by the in-DB runner"
100                    .to_string(),
101            });
102        }
103        let mut expr_ctx = ExprCtx::new(dialect, frame.next_param);
104        let expr_sql = lower_expr(&col.expr, &mut expr_ctx)?;
105        frame.next_param = expr_ctx.next_param;
106        let casted = match col.ty {
107            // Default text projection. Path-rooted expressions already produce
108            // text via `->>` (PG) / `json_extract` (SQLite); compound
109            // expressions (boolean predicates, arithmetic, ...) need an
110            // explicit text cast so the runners' `Option<String>` row reader
111            // can deserialize them.
112            SqlType::Text => project_text(&col.expr, &expr_sql, dialect),
113            other => dialect.cast(&expr_sql, other),
114        };
115        select_parts.push(format!("{casted} AS \"{}\"", sanitize_ident(&col.name)?));
116        columns.push(col.name.clone());
117    }
118
119    if select_parts.is_empty() {
120        return Err(SofError::InvalidViewDefinition(
121            "no output columns".to_string(),
122        ));
123    }
124    let select_clause = select_parts.join(",\n  ");
125
126    // Build FROM clause: `resources r` + any LATERAL joins, in order of appearance
127    // from the bottom of the tree upward (Scan first, then unnests).
128    let mut from_clause = format!("{} r", scan.table);
129    for join in &frame.joins {
130        from_clause.push('\n');
131        from_clause.push_str(&join.sql);
132    }
133
134    // WHERE clause: tenant predicate first (so `$1`/`$2` line up), then filters.
135    let mut where_parts: Vec<String> = Vec::new();
136    if with_tenant_predicate {
137        where_parts.push(format!(
138            "r.tenant_id = {}\n  AND r.resource_type = {}\n  AND r.is_deleted = {}",
139            dialect.placeholder(1),
140            dialect.placeholder(2),
141            dialect.bool_false()
142        ));
143    }
144    for pred in &frame.predicates {
145        where_parts.push(pred.clone());
146    }
147    let where_clause = where_parts.join("\n  AND ");
148
149    let sql = format!(
150        "SELECT\n  {select_clause}\nFROM {from_clause}\nWHERE {where_clause}\nORDER BY r.last_updated, r.id"
151    );
152
153    Ok(EmittedSql {
154        sql,
155        columns,
156        next_param_index: frame.next_param,
157    })
158}
159
160/// Emit a `WITH RECURSIVE … SELECT … FROM <cte> [JOIN resources r ON r.id = <cte>.rid]`
161/// query for a `Project` whose parent is a [`PlanNode::Recurse`]. The CTE
162/// projects `(rid, node)` so sibling root columns (those whose path roots on
163/// `r.data`) can resolve via a join back to `resources r`.
164fn emit_recurse_select(plan: &PlanNode, dialect: &dyn Dialect) -> Result<EmittedSql, SofError> {
165    let (project_cols, body) = match plan {
166        PlanNode::Project { parent, columns } => (columns.as_slice(), parent.as_ref()),
167        _ => unreachable!("emit_recurse_select called on non-Project plan"),
168    };
169
170    // Walk down past any LateralUnnest layers wrapping the Recurse — those
171    // are nested-forEach unnests that get JOINed onto the recursive CTE
172    // alias. Collect their sources for later join construction.
173    let mut extra_unnests: Vec<&PlanNode> = Vec::new();
174    let mut cur = body;
175    while let PlanNode::LateralUnnest { parent, .. } = cur {
176        extra_unnests.push(cur);
177        cur = parent.as_ref();
178    }
179    let recurse_node = cur;
180    let (parent_plan, step_paths, out_alias) = match recurse_node {
181        PlanNode::Recurse {
182            parent,
183            step_paths,
184            out_alias,
185            ..
186        } => (parent.as_ref(), step_paths.as_slice(), out_alias.as_str()),
187        _ => unreachable!("emit_recurse_select called with non-Recurse parent"),
188    };
189
190    // Walk the parent plan to collect tenant predicate + any top-level
191    // `where[]` filters. We expect a Scan with optional Filter chain — no
192    // unnests at this level (rejected upstream).
193    let mut frame = Frame::new();
194    walk_body(parent_plan, dialect, &mut frame)?;
195    let scan = frame
196        .scan
197        .as_ref()
198        .ok_or_else(|| SofError::InvalidViewDefinition("plan has no Scan node".to_string()))?;
199
200    // Tenant predicate text shared by the seed.
201    let tenant_pred = format!(
202        "r.tenant_id = {}\n  AND r.resource_type = {}\n  AND r.is_deleted = {}",
203        dialect.placeholder(1),
204        dialect.placeholder(2),
205        dialect.bool_false()
206    );
207    let mut where_pred = tenant_pred.clone();
208    for p in &frame.predicates {
209        where_pred.push_str("\n  AND ");
210        where_pred.push_str(p);
211    }
212
213    // Build seed branches — one SELECT per step path.
214    let mut seed_branches: Vec<String> = Vec::with_capacity(step_paths.len());
215    let mut step_branches: Vec<String> = Vec::with_capacity(step_paths.len());
216    for path in step_paths {
217        let src = SqlExpr::JsonPath {
218            root: "r.data".to_string(),
219            path: path.clone(),
220        };
221        let unnest = if dialect.lateral_keyword().is_empty() {
222            format!("{} je", emit_sqlite_unnest_source(&src))
223        } else {
224            format!(
225                "JOIN {}{} AS je(value) ON TRUE",
226                dialect.lateral_keyword(),
227                dialect.unnest_array(&emit_pg_unnest_source(&src))
228            )
229        };
230        let branch = if dialect.lateral_keyword().is_empty() {
231            format!(
232                "SELECT r.id AS rid, je.value AS node\n  FROM {} r, {}\n  WHERE {}",
233                scan.table, unnest, where_pred
234            )
235        } else {
236            format!(
237                "SELECT r.id AS rid, je.value AS node\n  FROM {} r {}\n  WHERE {}",
238                scan.table, unnest, where_pred
239            )
240        };
241        seed_branches.push(branch);
242    }
243
244    // Step branches — walk each path off `<alias>.node`. Multi-segment
245    // step paths (`answer.item`) chain a lateral unnest per Field so
246    // path-through-array flattening matches FHIRPath semantics.
247    //
248    // PG additionally requires that a recursive CTE reference its own name
249    // at most once. When `step_paths` has more than one entry, fold all
250    // step navigations into a single `SELECT … FROM rec_0, LATERAL (path₁
251    // UNION ALL path₂ UNION ALL …)` so `rec_0` is referenced exactly once.
252    let is_pg_dialect = !dialect.lateral_keyword().is_empty();
253    let mut pg_lateral_branches: Vec<String> = Vec::new();
254    for path in step_paths {
255        let segs: Vec<&str> = path
256            .0
257            .iter()
258            .filter_map(|s| match s {
259                PathStep::Field(n) => Some(n.as_str()),
260                _ => None,
261            })
262            .collect();
263        if segs.is_empty() {
264            continue;
265        }
266        let mut prev_root = format!("{out_alias}.node");
267        let mut from_parts: Vec<String> = Vec::new();
268        for (i, field) in segs.iter().enumerate() {
269            let alias = format!("rs{i}");
270            let src = SqlExpr::JsonPath {
271                root: prev_root.clone(),
272                path: super::ir::JsonPath(vec![PathStep::Field((*field).to_string())]),
273            };
274            if dialect.lateral_keyword().is_empty() {
275                from_parts.push(format!("{} {alias}", emit_sqlite_unnest_source(&src)));
276            } else {
277                from_parts.push(format!(
278                    "{}{} AS {alias}(value)",
279                    dialect.lateral_keyword(),
280                    dialect.unnest_array(&emit_pg_unnest_source(&src))
281                ));
282            }
283            prev_root = format!("{alias}.value");
284        }
285        let leaf_alias = format!("rs{}", segs.len() - 1);
286        if is_pg_dialect && step_paths.len() > 1 {
287            // Build a sub-SELECT that returns just the leaf value; we'll
288            // wrap them all in one LATERAL below.
289            let mut from_clause = String::new();
290            for (i, fp) in from_parts.iter().enumerate() {
291                if i == 0 {
292                    from_clause.push_str(fp);
293                } else {
294                    from_clause.push_str(" JOIN ");
295                    from_clause.push_str(fp);
296                    from_clause.push_str(" ON TRUE");
297                }
298            }
299            pg_lateral_branches.push(format!("SELECT {leaf_alias}.value FROM {from_clause}"));
300        } else {
301            let from_clause = if dialect.lateral_keyword().is_empty() {
302                format!("{out_alias}, {}", from_parts.join(", "))
303            } else {
304                let mut s = out_alias.to_string();
305                for fp in &from_parts {
306                    s.push_str(" JOIN ");
307                    s.push_str(fp);
308                    s.push_str(" ON TRUE");
309                }
310                s
311            };
312            step_branches.push(format!(
313                "SELECT {out_alias}.rid, {leaf_alias}.value AS node\n  FROM {from_clause}"
314            ));
315        }
316    }
317    if is_pg_dialect && !pg_lateral_branches.is_empty() {
318        // Single recursive SELECT that references `rec_0` once and
319        // explores all step paths via a lateral UNION ALL of leaf values.
320        let unioned = pg_lateral_branches.join("\n    UNION ALL\n    ");
321        step_branches.push(format!(
322            "SELECT {out_alias}.rid, _step.value AS node\n  \
323             FROM {out_alias}, LATERAL ({unioned}) AS _step(value)"
324        ));
325    }
326
327    // PG's `WITH RECURSIVE` requires exactly one `UNION ALL` separating
328    // the non-recursive term from the recursive term. Wrap each side in
329    // parens so multiple seed/step paths stay on the correct side of the
330    // split. SQLite is permissive, so the flat `UNION ALL` form is fine.
331    let is_pg = !dialect.lateral_keyword().is_empty();
332    let cte_body = if is_pg && (seed_branches.len() > 1 || step_branches.len() > 1) {
333        let seeds = if seed_branches.len() == 1 {
334            seed_branches.remove(0)
335        } else {
336            format!("({})", seed_branches.join("\n  UNION ALL\n  "))
337        };
338        let steps = if step_branches.is_empty() {
339            String::new()
340        } else if step_branches.len() == 1 {
341            step_branches.remove(0)
342        } else {
343            format!("({})", step_branches.join("\n  UNION ALL\n  "))
344        };
345        if steps.is_empty() {
346            seeds
347        } else {
348            format!("{seeds}\n  UNION ALL\n  {steps}")
349        }
350    } else {
351        let mut all = seed_branches;
352        all.extend(step_branches);
353        all.join("\n  UNION ALL\n  ")
354    };
355
356    // Determine which columns reference `r.data` (sibling root cols from
357    // sibling clauses) — they need a JOIN back to `resources r`.
358    let needs_resource_join = project_cols
359        .iter()
360        .any(|c| column_refers_to_resource(&c.expr));
361
362    // Build SELECT clause.
363    let mut select_parts: Vec<String> = Vec::with_capacity(project_cols.len());
364    let mut columns: Vec<String> = Vec::with_capacity(project_cols.len());
365    for col in project_cols {
366        if col.collection {
367            return Err(SofError::Uncompilable {
368                reason: "column.collection=true is not yet supported by the in-DB runner"
369                    .to_string(),
370            });
371        }
372        let mut ctx = ExprCtx::new(dialect, frame.next_param);
373        let expr_sql = lower_expr(&col.expr, &mut ctx)?;
374        frame.next_param = ctx.next_param;
375        let casted = match col.ty {
376            SqlType::Text => project_text(&col.expr, &expr_sql, dialect),
377            other => dialect.cast(&expr_sql, other),
378        };
379        select_parts.push(format!("{casted} AS \"{}\"", sanitize_ident(&col.name)?));
380        columns.push(col.name.clone());
381    }
382
383    let mut from_clause = if needs_resource_join {
384        format!(
385            "{} JOIN {} r ON r.id = {}.rid AND {}",
386            out_alias, scan.table, out_alias, tenant_pred
387        )
388    } else {
389        out_alias.to_string()
390    };
391
392    // Append any forEach unnests stacked above the recurse — emitted in
393    // outer-to-inner order matching how walk_body would assemble them.
394    // Iterate `extra_unnests` in reverse since we collected from outermost
395    // (closest to Project) downward.
396    for layer in extra_unnests.iter().rev() {
397        if let PlanNode::LateralUnnest {
398            source,
399            out_alias: alias,
400            left_join,
401            on_filter,
402            ..
403        } = layer
404        {
405            let join_kw = if *left_join { "LEFT JOIN" } else { "JOIN" };
406            let extra_on = if let Some(filter) = on_filter {
407                let mut ctx = ExprCtx::new(dialect, frame.next_param);
408                let s = lower_expr(filter, &mut ctx)?;
409                frame.next_param = ctx.next_param;
410                Some(s)
411            } else {
412                None
413            };
414            if dialect.lateral_keyword().is_empty() {
415                let source_sql = emit_sqlite_unnest_source(source);
416                let on = match &extra_on {
417                    Some(f) => format!("1=1 AND {f}"),
418                    None => "1=1".to_string(),
419                };
420                from_clause.push('\n');
421                from_clause.push_str(&format!("{join_kw} {source_sql} {alias} ON {on}"));
422            } else {
423                let source_sql = emit_pg_unnest_source(source);
424                let unnest = dialect.unnest_array(&source_sql);
425                let on = match &extra_on {
426                    Some(f) => format!("TRUE AND {f}"),
427                    None => "TRUE".to_string(),
428                };
429                from_clause.push('\n');
430                from_clause.push_str(&format!(
431                    "{join_kw} {}{} AS {alias}(value) ON {on}",
432                    dialect.lateral_keyword(),
433                    unnest
434                ));
435            }
436        }
437    }
438
439    let sql = format!(
440        "WITH RECURSIVE {out_alias}(rid, node) AS (\n  {cte_body}\n)\nSELECT\n  {}\nFROM {from_clause}\nORDER BY 1",
441        select_parts.join(",\n  ")
442    );
443
444    Ok(EmittedSql {
445        sql,
446        columns,
447        next_param_index: frame.next_param,
448    })
449}
450
451/// Returns true when `expr` (or any sub-expression) navigates off the
452/// `r.data` document — used by `emit_recurse_select` to decide whether to
453/// JOIN the recursive CTE back to `resources`.
454fn column_refers_to_resource(expr: &SqlExpr) -> bool {
455    match expr {
456        SqlExpr::JsonPath { root, .. } => root == "r.data" || root.starts_with("r.data"),
457        SqlExpr::Cast { inner, .. }
458        | SqlExpr::UnaryOp { inner, .. }
459        | SqlExpr::AsJson(inner)
460        | SqlExpr::Alias { inner, .. } => column_refers_to_resource(inner),
461        SqlExpr::BinOp { lhs, rhs, .. } => {
462            column_refers_to_resource(lhs) || column_refers_to_resource(rhs)
463        }
464        SqlExpr::Case { arms, else_ } => {
465            arms.iter()
466                .any(|(c, v)| column_refers_to_resource(c) || column_refers_to_resource(v))
467                || else_.as_deref().is_some_and(column_refers_to_resource)
468        }
469        SqlExpr::Coalesce(parts) => parts.iter().any(column_refers_to_resource),
470        SqlExpr::NullIf(a, b) => column_refers_to_resource(a) || column_refers_to_resource(b),
471        SqlExpr::ReferenceKey { reference, .. } => column_refers_to_resource(reference),
472        SqlExpr::Boundary { source, .. } => column_refers_to_resource(source),
473        _ => false,
474    }
475}
476
477/// Emit a `UNION ALL` query — each branch is emitted as a standalone SELECT,
478/// trailing `ORDER BY` is stripped from each, and a single `ORDER BY 1` is
479/// appended at the end of the compound query.
480fn emit_union(branches: &[PlanNode], dialect: &dyn Dialect) -> Result<EmittedSql, SofError> {
481    if branches.is_empty() {
482        return Err(SofError::InvalidViewDefinition(
483            "unionAll branches list is empty".to_string(),
484        ));
485    }
486
487    let mut branch_sqls: Vec<String> = Vec::with_capacity(branches.len());
488    let mut columns: Option<Vec<String>> = None;
489    let mut next_param = 3usize;
490
491    for branch in branches {
492        let emitted = emit_plan(branch, dialect)?;
493
494        match &columns {
495            None => columns = Some(emitted.columns.clone()),
496            Some(expected) if *expected != emitted.columns => {
497                return Err(SofError::Uncompilable {
498                    reason: format!(
499                        "unionAll branches produce different column schemas: {:?} vs {:?}",
500                        expected, emitted.columns
501                    ),
502                });
503            }
504            _ => {}
505        }
506
507        next_param = next_param.max(emitted.next_param_index);
508        // Branches containing a `WITH RECURSIVE` (emitted for `repeat:`)
509        // can't appear bare in a compound SELECT — neither dialect allows
510        // `WITH ... UNION ALL WITH ...`. Wrap as `SELECT * FROM (WITH ...
511        // SELECT ...)` so each branch is a plain SELECT operand.
512        let body = strip_trailing_order_by(&emitted.sql).to_string();
513        let needs_wrap = body.trim_start().starts_with("WITH");
514        if needs_wrap {
515            // PG requires every parenthesised subquery in `FROM` to have an
516            // alias; SQLite allows it bare. Tag with a unique alias so PG
517            // is happy without affecting SQLite (which ignores it).
518            let alias = format!("_recurse_{}", branch_sqls.len());
519            branch_sqls.push(format!("SELECT * FROM ({body}) AS {alias}"));
520        } else {
521            branch_sqls.push(body);
522        }
523    }
524
525    let sql = format!("{}\nORDER BY 1", branch_sqls.join("\nUNION ALL\n"));
526    Ok(EmittedSql {
527        sql,
528        columns: columns.unwrap_or_default(),
529        next_param_index: next_param,
530    })
531}
532
533// ============================================================================
534// Frame: accumulates pieces of a single SELECT during the bottom-up walk
535// ============================================================================
536
537#[derive(Debug)]
538struct Frame {
539    scan: Option<ScanInfo>,
540    /// Lateral joins, in the order they appear in the FROM clause.
541    joins: Vec<JoinClause>,
542    /// AND-composed predicates (excluding the tenant predicate).
543    predicates: Vec<String>,
544    /// Next free bound-parameter index — threaded through expression lowering
545    /// so that predicates and column expressions allocate non-overlapping
546    /// `$N` / `?N` slots.
547    next_param: usize,
548}
549
550#[derive(Debug)]
551struct ScanInfo {
552    table: &'static str,
553}
554
555#[derive(Debug)]
556struct JoinClause {
557    sql: String,
558}
559
560impl Frame {
561    fn new() -> Self {
562        Self {
563            scan: None,
564            joins: Vec::new(),
565            predicates: Vec::new(),
566            // $1 = tenant_id, $2 = resource_type — both reserved by emit_select.
567            next_param: 3,
568        }
569    }
570}
571
572/// Walks `body` (the sub-tree below the top `Project`), pushing pieces into
573/// `frame` as it goes.
574fn walk_body(node: &PlanNode, dialect: &dyn Dialect, frame: &mut Frame) -> Result<(), SofError> {
575    match node {
576        PlanNode::Scan { alias, .. } => {
577            if alias != "r" {
578                return Err(SofError::Uncompilable {
579                    reason: format!("Scan alias must be 'r' in current emitter (got '{alias}')"),
580                });
581            }
582            frame.scan = Some(ScanInfo { table: "resources" });
583            Ok(())
584        }
585        PlanNode::Filter { parent, predicate } => {
586            walk_body(parent, dialect, frame)?;
587            let mut ctx = ExprCtx::new(dialect, frame.next_param);
588            let pred_sql = lower_expr(predicate, &mut ctx)?;
589            frame.next_param = ctx.next_param;
590            // FHIRPath three-valued boundary — empty / NULL filters the row
591            // out. Dialect-specific because PG is strict-typed (text from
592            // `->>` must be cast to boolean) while SQLite is permissive.
593            frame.predicates.push(dialect.truthy_predicate(&pred_sql));
594            Ok(())
595        }
596        PlanNode::LateralUnnest {
597            parent,
598            source,
599            out_alias,
600            left_join,
601            on_filter,
602            flat_index,
603        } => {
604            walk_body(parent, dialect, frame)?;
605            let join_kw = if *left_join { "LEFT JOIN" } else { "JOIN" };
606            let lateral = dialect.lateral_keyword();
607            // Lower the optional ON-clause filter (used by `forEach` paths
608            // that contain a trailing `where(crit)`).
609            let extra_on = if let Some(filter) = on_filter {
610                let mut ctx = ExprCtx::new(dialect, frame.next_param);
611                let sql = lower_expr(filter, &mut ctx)?;
612                frame.next_param = ctx.next_param;
613                Some(sql)
614            } else {
615                None
616            };
617            let join_sql = if lateral.is_empty() {
618                // SQLite — `json_each(<root>, '$.path')` two-arg form when the
619                // source is a simple JSON path off the resource document;
620                // falls back to `json_each(<sql_expr>)` for anything richer.
621                let source_sql = emit_sqlite_unnest_source(source);
622                let on = match &extra_on {
623                    Some(f) => format!("1=1 AND {f}"),
624                    None => "1=1".to_string(),
625                };
626                if let Some(idx) = flat_index {
627                    // `forEach: "<chain>[N]"` — FHIRPath indexes the
628                    // FLATTENED collection, not each per-step iteration.
629                    // Hoist any prior joins (collected for this select) into
630                    // the LIMITed subquery so the outer SELECT sees at most
631                    // one row per resource.
632                    let inner = format!("{out_alias}_src");
633                    let prior = std::mem::take(&mut frame.joins);
634                    let prior_sources: Vec<String> = prior
635                        .iter()
636                        .map(|j| {
637                            j.sql
638                                .strip_prefix("JOIN ")
639                                .and_then(|s| s.find(" ON ").map(|i| s[..i].to_string()))
640                                .unwrap_or_else(|| j.sql.clone())
641                        })
642                        .collect();
643                    let from_chain = if prior_sources.is_empty() {
644                        format!("{source_sql} {inner}")
645                    } else {
646                        format!("{}, {source_sql} {inner}", prior_sources.join(", "))
647                    };
648                    format!(
649                        "{join_kw} (SELECT {inner}.value AS value FROM {from_chain} \
650                         WHERE {on} LIMIT 1 OFFSET {idx}) {out_alias} ON 1=1"
651                    )
652                } else {
653                    format!("{join_kw} {source_sql} {out_alias} ON {on}")
654                }
655            } else {
656                // PostgreSQL — `jsonb_array_elements(<json_value>)` over the
657                // JSON-valued navigation (note: must use `->`, not `->>`).
658                let source_sql = emit_pg_unnest_source(source);
659                let unnest = dialect.unnest_array(&source_sql);
660                let on = match &extra_on {
661                    Some(f) => format!("TRUE AND {f}"),
662                    None => "TRUE".to_string(),
663                };
664                if let Some(idx) = flat_index {
665                    format!(
666                        "{join_kw} LATERAL (SELECT value FROM {unnest} AS sub(value) \
667                         WHERE {on} LIMIT 1 OFFSET {idx}) AS {out_alias}(value) ON TRUE"
668                    )
669                } else {
670                    format!("{join_kw} {lateral}{unnest} AS {out_alias}(value) ON {on}")
671                }
672            };
673            frame.joins.push(JoinClause { sql: join_sql });
674            Ok(())
675        }
676        PlanNode::Project { .. } => Err(SofError::InvalidViewDefinition(
677            "nested Project nodes are not supported by the current emitter".to_string(),
678        )),
679        PlanNode::Union(_) => Err(SofError::InvalidViewDefinition(
680            "Union node may only appear at the top of a plan".to_string(),
681        )),
682        PlanNode::Recurse { .. } => Err(SofError::Uncompilable {
683            reason: "Recurse (repeat:) is not yet implemented in the emitter".to_string(),
684        }),
685    }
686}
687
688// ============================================================================
689// Expression lowering
690// ============================================================================
691
692/// Mutable context threaded through [`lower_expr`] — tracks the next free
693/// parameter slot so nested expressions don't reuse indices.
694struct ExprCtx<'a> {
695    dialect: &'a dyn Dialect,
696    next_param: usize,
697}
698
699impl<'a> ExprCtx<'a> {
700    fn new(dialect: &'a dyn Dialect, next_param: usize) -> Self {
701        Self {
702            dialect,
703            next_param,
704        }
705    }
706}
707
708fn lower_expr(expr: &SqlExpr, ctx: &mut ExprCtx<'_>) -> Result<String, SofError> {
709    match expr {
710        SqlExpr::Lit(v) => Ok(lower_lit(v, ctx.dialect)),
711        SqlExpr::JsonPath { root, path } => Ok(lower_json_path(root, path, ctx.dialect)),
712        SqlExpr::Param(n) => Ok(ctx.dialect.placeholder(*n)),
713        SqlExpr::ColRef(name) => Ok(name.clone()),
714        SqlExpr::Cast { inner, ty } => {
715            let inner = lower_expr(inner, ctx)?;
716            Ok(ctx.dialect.cast(&inner, *ty))
717        }
718        SqlExpr::BinOp { op, lhs, rhs } => lower_binop_dialect(*op, lhs, rhs, ctx),
719        SqlExpr::UnaryOp { op, inner } => {
720            let inner = lower_expr(inner, ctx)?;
721            Ok(match op {
722                UnaryOp::Not => format!("NOT ({inner})"),
723                UnaryOp::IsNull => format!("({inner}) IS NULL"),
724                UnaryOp::IsNotNull => format!("({inner}) IS NOT NULL"),
725                UnaryOp::Neg => format!("-({inner})"),
726            })
727        }
728        SqlExpr::Case { arms, else_ } => {
729            let mut s = String::from("CASE");
730            for (cond, val) in arms {
731                let c = lower_expr(cond, ctx)?;
732                let v = lower_expr(val, ctx)?;
733                s.push_str(&format!(" WHEN {c} THEN {v}"));
734            }
735            if let Some(e) = else_ {
736                let v = lower_expr(e, ctx)?;
737                s.push_str(&format!(" ELSE {v}"));
738            }
739            s.push_str(" END");
740            Ok(s)
741        }
742        SqlExpr::Coalesce(parts) => {
743            let parts: Result<Vec<String>, _> = parts.iter().map(|p| lower_expr(p, ctx)).collect();
744            Ok(format!("coalesce({})", parts?.join(", ")))
745        }
746        SqlExpr::NullIf(a, b) => {
747            let a = lower_expr(a, ctx)?;
748            let b = lower_expr(b, ctx)?;
749            Ok(format!("nullif({a}, {b})"))
750        }
751        SqlExpr::AsJson(inner) => {
752            let inner = lower_expr(inner, ctx)?;
753            Ok(ctx.dialect.cast(&inner, SqlType::Json))
754        }
755        SqlExpr::JsonAgg(_) | SqlExpr::Scalar(_) | SqlExpr::Exists(_) | SqlExpr::CountSub(_) => {
756            Err(SofError::Uncompilable {
757                reason: "subquery-valued expressions are not yet supported by the in-DB runner"
758                    .to_string(),
759            })
760        }
761        SqlExpr::Alias { inner, .. } => lower_expr(inner, ctx),
762        SqlExpr::Boundary { side, kind, source } => {
763            let src = lower_expr(source, ctx)?;
764            Ok(lower_boundary(*side, *kind, &src, ctx.dialect))
765        }
766        SqlExpr::ScalarFromChain {
767            chain_sql,
768            projection,
769            offset,
770        } => {
771            let proj_sql = lower_expr(projection, ctx)?;
772            Ok(format!(
773                "(SELECT {proj_sql} FROM {chain_sql} LIMIT 1 OFFSET {offset})"
774            ))
775        }
776        SqlExpr::CollectionAgg { root, path } => {
777            let mut field_steps: Vec<&str> = Vec::new();
778            for step in &path.0 {
779                if let PathStep::Field(name) = step {
780                    field_steps.push(name.as_str());
781                }
782            }
783            if field_steps.is_empty() {
784                return Ok(format!(
785                    "(SELECT {} FROM (SELECT {root} AS v) WHERE v IS NOT NULL)",
786                    ctx.dialect.json_agg("v")
787                ));
788            }
789            // For 1-segment paths (e.g. `name`), unnest once and aggregate.
790            // For 2-segment (e.g. `name.family`), unnest the outer; project
791            // the inner field — handles the common scalar-leaf case.
792            // For deeper paths or array-of-array shapes (`name.given`), we
793            // need a guarded second unnest that handles both array and
794            // scalar leaves.
795            let lateral = ctx.dialect.lateral_keyword();
796            if field_steps.len() == 1 {
797                let src = SqlExpr::JsonPath {
798                    root: root.clone(),
799                    path: super::ir::JsonPath(vec![PathStep::Field(field_steps[0].to_string())]),
800                };
801                let from = if lateral.is_empty() {
802                    format!("{} ca0", emit_sqlite_unnest_source(&src))
803                } else {
804                    format!(
805                        "{}{} AS ca0(value)",
806                        lateral,
807                        ctx.dialect.unnest_array(&emit_pg_unnest_source(&src))
808                    )
809                };
810                let agg = ctx.dialect.json_agg("ca0.value");
811                return Ok(format!("(SELECT {agg} FROM {from})"));
812            }
813            // Multi-segment: unnest outer, then guard-unnest the leaf so
814            // both array leaves (flattened) and scalar leaves (single-element)
815            // contribute their values to the aggregate.
816            let outer_src = SqlExpr::JsonPath {
817                root: root.clone(),
818                path: super::ir::JsonPath(vec![PathStep::Field(field_steps[0].to_string())]),
819            };
820            let leaf_field = field_steps[field_steps.len() - 1];
821            let middle_fields = &field_steps[1..field_steps.len() - 1];
822            // Compose the path through the middle and to the leaf field
823            // so we can read its value off the outer iteration alias.
824            let mut leaf_path_segs: Vec<&str> = Vec::new();
825            for m in middle_fields {
826                leaf_path_segs.push(m);
827            }
828            leaf_path_segs.push(leaf_field);
829            let leaf_value_sql = if lateral.is_empty() {
830                let mut path = String::from("$");
831                for s in &leaf_path_segs {
832                    path.push('.');
833                    path.push_str(s);
834                }
835                format!("json_extract(ca0.value, '{path}')")
836            } else {
837                let segs = leaf_path_segs.to_vec();
838                ctx.dialect.json_path("ca0.value", &segs)
839            };
840            let outer_from = if lateral.is_empty() {
841                format!("{} ca0", emit_sqlite_unnest_source(&outer_src))
842            } else {
843                format!(
844                    "{}{} AS ca0(value)",
845                    lateral,
846                    ctx.dialect.unnest_array(&emit_pg_unnest_source(&outer_src))
847                )
848            };
849            // Guard-unnest: if the leaf value is an array, iterate; otherwise
850            // wrap in a single-element array so json_each / unnest emits one
851            // row with the scalar value.
852            if lateral.is_empty() {
853                // SQLite — `json_each` over a CASE that wraps non-array
854                // values in a single-element array. We check array-ness via
855                // `json_type(parent, '$.path')` (the path-bearing form),
856                // which works on raw values; the bare-value `json_type(x)`
857                // form errors on already-extracted scalars.
858                let mut leaf_path_str = String::from("$");
859                for s in &leaf_path_segs {
860                    leaf_path_str.push('.');
861                    leaf_path_str.push_str(s);
862                }
863                let type_check = format!("json_type(ca0.value, '{leaf_path_str}')");
864                let guarded = format!(
865                    "json_each(CASE WHEN {type_check} = 'array' \
866                     THEN {leaf_value_sql} \
867                     ELSE json_array({leaf_value_sql}) END)"
868                );
869                let agg = ctx.dialect.json_agg("ca1.value");
870                Ok(format!(
871                    "(SELECT {agg} FROM {outer_from}, {guarded} ca1 \
872                     WHERE {type_check} IS NOT NULL)"
873                ))
874            } else {
875                // PG — `jsonb_array_elements` requires an array. Wrap with
876                // `case when jsonb_typeof = 'array' then ... else jsonb_build_array(...)`.
877                let guarded = format!(
878                    "jsonb_array_elements(\
879                     CASE WHEN jsonb_typeof({leaf_value_sql}) = 'array' \
880                     THEN {leaf_value_sql} \
881                     ELSE jsonb_build_array({leaf_value_sql}) END)"
882                );
883                let agg = ctx.dialect.json_agg("ca1.value");
884                Ok(format!(
885                    "(SELECT {agg} FROM {outer_from} \
886                     JOIN LATERAL {guarded} AS ca1(value) ON TRUE \
887                     WHERE {leaf_value_sql} IS NOT NULL)"
888                ))
889            }
890        }
891        SqlExpr::JoinAggregate {
892            outer_focus,
893            outer_alias,
894            inner_field,
895            inner_alias,
896            separator,
897        } => {
898            // Two nested lateral unnests, then string-aggregate the inner
899            // values. The separator is inlined as a SQL string literal —
900            // the FHIRPath parser has already validated it as a string
901            // literal so escaping is a simple `''`-doubling.
902            let sep_lit = format!("'{}'", separator.replace('\'', "''"));
903            let unnest_outer = if ctx.dialect.lateral_keyword().is_empty() {
904                let src = emit_sqlite_unnest_source(outer_focus);
905                format!("FROM {src} {outer_alias}")
906            } else {
907                let src = emit_pg_unnest_source(outer_focus);
908                format!(
909                    "FROM {}{} AS {outer_alias}(value)",
910                    ctx.dialect.lateral_keyword(),
911                    ctx.dialect.unnest_array(&src)
912                )
913            };
914            let inner_src = SqlExpr::JsonPath {
915                root: format!("{outer_alias}.value"),
916                path: super::ir::JsonPath(vec![PathStep::Field(inner_field.clone())]),
917            };
918            let unnest_inner = if ctx.dialect.lateral_keyword().is_empty() {
919                let src = emit_sqlite_unnest_source(&inner_src);
920                format!(", {src} {inner_alias}")
921            } else {
922                let src = emit_pg_unnest_source(&inner_src);
923                format!(
924                    " JOIN {}{} AS {inner_alias}(value) ON TRUE",
925                    ctx.dialect.lateral_keyword(),
926                    ctx.dialect.unnest_array(&src)
927                )
928            };
929            let value_text = if ctx.dialect.lateral_keyword().is_empty() {
930                format!("{inner_alias}.value")
931            } else {
932                format!("({inner_alias}.value #>> '{{}}')")
933            };
934            let agg = ctx.dialect.string_agg(&value_text, &sep_lit);
935            // Empty input collections yield NULL (empty output), not an empty
936            // string, per the FHIRPath spec (SoF v2 PR #349). `string_agg` /
937            // `group_concat` over zero rows already returns NULL.
938            Ok(format!("(SELECT {agg} {unnest_outer}{unnest_inner})"))
939        }
940        SqlExpr::WhereScalar {
941            focus,
942            iter_alias,
943            predicate,
944            projection,
945        } => {
946            let unnest = if ctx.dialect.lateral_keyword().is_empty() {
947                let src = emit_sqlite_unnest_source(focus);
948                format!("FROM {src} {iter_alias}")
949            } else {
950                let src = emit_pg_unnest_source(focus);
951                format!(
952                    "FROM {}{} AS {iter_alias}(value)",
953                    ctx.dialect.lateral_keyword(),
954                    ctx.dialect.unnest_array(&src)
955                )
956            };
957            let pred_sql = lower_expr(predicate, ctx)?;
958            let proj_sql = lower_expr(projection, ctx)?;
959            Ok(format!(
960                "(SELECT {proj_sql} {unnest} WHERE {pred_sql} LIMIT 1)"
961            ))
962        }
963        SqlExpr::WhereExists {
964            focus,
965            iter_alias,
966            predicate,
967            negate,
968        } => {
969            let unnest = if ctx.dialect.lateral_keyword().is_empty() {
970                let src = emit_sqlite_unnest_source(focus);
971                format!("FROM {src} {iter_alias}")
972            } else {
973                let src = emit_pg_unnest_source(focus);
974                format!(
975                    "FROM {}{} AS {iter_alias}(value)",
976                    ctx.dialect.lateral_keyword(),
977                    ctx.dialect.unnest_array(&src)
978                )
979            };
980            let pred_sql = lower_expr(predicate, ctx)?;
981            let kw = if *negate { "NOT EXISTS" } else { "EXISTS" };
982            Ok(format!("{kw} (SELECT 1 {unnest} WHERE {pred_sql})"))
983        }
984        SqlExpr::ReferenceKey {
985            reference,
986            expected_type,
987        } => {
988            let ref_sql = lower_expr(reference, ctx)?;
989            let last = ctx.dialect.last_path_segment(&ref_sql);
990            match expected_type {
991                None => Ok(last),
992                Some(ty) => {
993                    // `getReferenceKey(Type)` returns NULL when the
994                    // reference's type segment doesn't match. The simplest
995                    // portable check is two LIKE patterns, covering the
996                    // relative form `Type/id` and the absolute form
997                    // `http://.../Type/id`.
998                    let p1 = format!("{ty}/%").replace('\'', "''");
999                    let p2 = format!("%/{ty}/%").replace('\'', "''");
1000                    Ok(format!(
1001                        "CASE WHEN {ref_sql} LIKE '{p1}' OR {ref_sql} LIKE '{p2}' \
1002                         THEN {last} ELSE NULL END"
1003                    ))
1004                }
1005            }
1006        }
1007    }
1008}
1009
1010/// Wraps a column projection's lowered SQL so the row mapper reads it as
1011/// text.
1012///
1013/// - `JsonPath { path: empty }` references a row-source alias directly. In
1014///   PG that alias is `jsonb` (`fe.value` etc.); `#>>'{}'` extracts it as
1015///   text and unwraps scalar JSON strings (`'"foo"'::jsonb #>> '{}'` →
1016///   `foo`, not `"foo"`). SQLite's loose typing returns the raw value.
1017/// - `JsonPath` with non-empty path is already text via `->>`/`#>>` (PG)
1018///   or `json_extract` (SQLite); pass through verbatim.
1019/// - `Lit` is always text (or NULL); pass through.
1020/// - Compound expressions go through the dialect's generic text cast.
1021fn project_text(expr: &SqlExpr, lowered: &str, dialect: &dyn Dialect) -> String {
1022    match expr {
1023        SqlExpr::JsonPath { path, .. } if path.is_empty() => {
1024            if dialect.name() == "postgres" {
1025                format!("({lowered})#>>'{{}}'")
1026            } else {
1027                lowered.to_string()
1028            }
1029        }
1030        SqlExpr::JsonPath { .. } | SqlExpr::Lit(_) => lowered.to_string(),
1031        _ => dialect.cast(lowered, SqlType::Text),
1032    }
1033}
1034
1035fn lower_lit(v: &LitValue, dialect: &dyn Dialect) -> String {
1036    match v {
1037        LitValue::Null => "NULL".to_string(),
1038        LitValue::Bool(true) => dialect.bool_true().to_string(),
1039        LitValue::Bool(false) => dialect.bool_false().to_string(),
1040        LitValue::Int(n) => n.to_string(),
1041        LitValue::Decimal(s) => s.clone(),
1042        // Compile-time-constant idents only (e.g. a polymorphic-field key).
1043        // User strings must always go through `SqlExpr::Param`.
1044        LitValue::Str(s) => format!("'{}'", s.replace('\'', "''")),
1045    }
1046}
1047
1048fn lower_json_path(root: &str, path: &JsonPath, dialect: &dyn Dialect) -> String {
1049    if path.is_empty() {
1050        return root.to_string();
1051    }
1052    // Resolve the path to plain field/index segments (OfType / TypeFilter
1053    // were already collapsed during AST lowering).
1054    let raw_segments: Vec<String> = path
1055        .0
1056        .iter()
1057        .filter_map(|step| match step {
1058            PathStep::Field(name) => Some(name.clone()),
1059            PathStep::Index(n) => Some(n.to_string()),
1060            PathStep::OfType(_) | PathStep::TypeFilter(_) => None,
1061        })
1062        .collect();
1063    if raw_segments.is_empty() {
1064        return root.to_string();
1065    }
1066
1067    // FHIRPath flattens collections automatically — `name.family` over a
1068    // resource where `name` is an array yields a collection of family
1069    // strings. Column extractions (which want a single value when
1070    // `collection: false`) need to pick the first element. Without FHIR
1071    // schema, the simplest portable approach is `coalesce(<array-first>,
1072    // <plain>)`: the array-first form returns the value when the
1073    // intermediate is an array; plain handles scalar intermediates.
1074    //
1075    // Capped at two-segment paths — deeper paths skip the fallback rather
1076    // than emit 2^N combinations. Index segments preserve their literal
1077    // position.
1078    let field_count = path
1079        .0
1080        .iter()
1081        .filter(|s| matches!(s, PathStep::Field(_)))
1082        .count();
1083    let trailing_zero_from_first =
1084        matches!(path.0.last(), Some(PathStep::Index(0))) && field_count >= 2;
1085    let other_indices = path
1086        .0
1087        .iter()
1088        .enumerate()
1089        .any(|(i, s)| matches!(s, PathStep::Index(_)) && i + 1 != path.0.len());
1090
1091    let segs: Vec<&str> = raw_segments.iter().map(String::as_str).collect();
1092
1093    // Path with a trailing `Index(0)` from `.first()` on a multi-Field path:
1094    // lift the index to the array boundary so `name.family.first()` becomes
1095    // `name[0].family` (the family of the first name) rather than the
1096    // (invalid) first character of a string.
1097    if trailing_zero_from_first && !other_indices {
1098        let mut interleaved: Vec<String> = Vec::new();
1099        let mut first_field_seen = false;
1100        for step in &path.0[..path.0.len() - 1] {
1101            match step {
1102                PathStep::Field(n) => {
1103                    interleaved.push(n.clone());
1104                    if !first_field_seen {
1105                        interleaved.push("0".to_string());
1106                        first_field_seen = true;
1107                    }
1108                }
1109                PathStep::Index(n) => interleaved.push(n.to_string()),
1110                _ => {}
1111            }
1112        }
1113        let lifted: Vec<&str> = interleaved.iter().map(String::as_str).collect();
1114        return dialect.json_path_text(root, &lifted);
1115    }
1116
1117    let already_indexed =
1118        other_indices || matches!(path.0.last(), Some(PathStep::Index(_))) && field_count < 2;
1119
1120    // Multi-Field paths (no explicit Index) get an `array-first → plain`
1121    // coalesce — `[0]` is inserted after the first Field so that paths like
1122    // `name.family` or `link.other.reference` traverse arrays of objects.
1123    if field_count >= 2 && !already_indexed {
1124        let array_segs: Vec<String> = path
1125            .0
1126            .iter()
1127            .enumerate()
1128            .flat_map(|(i, step)| match step {
1129                PathStep::Field(name) if i == 0 => vec![name.clone(), "0".to_string()],
1130                PathStep::Field(name) => vec![name.clone()],
1131                PathStep::Index(n) => vec![n.to_string()],
1132                _ => Vec::new(),
1133            })
1134            .collect();
1135        let array_refs: Vec<&str> = array_segs.iter().map(String::as_str).collect();
1136        return format!(
1137            "coalesce({}, {})",
1138            dialect.json_path_text(root, &array_refs),
1139            dialect.json_path_text(root, &segs)
1140        );
1141    }
1142
1143    dialect.json_path_text(root, &segs)
1144}
1145
1146fn lower_binop(op: BinOp) -> &'static str {
1147    match op {
1148        BinOp::Eq => "=",
1149        BinOp::Neq => "!=",
1150        BinOp::Lt => "<",
1151        BinOp::Lte => "<=",
1152        BinOp::Gt => ">",
1153        BinOp::Gte => ">=",
1154        BinOp::Add => "+",
1155        BinOp::Sub => "-",
1156        BinOp::Mul => "*",
1157        BinOp::Div => "/",
1158        BinOp::And => "AND",
1159        BinOp::Or => "OR",
1160        BinOp::Concat => "||",
1161        BinOp::Like => "LIKE",
1162        BinOp::RegexMatch => "~",
1163    }
1164}
1165
1166/// Dialect-aware binary-operator lowering.
1167///
1168/// SQLite is loose-typed and accepts `text op number` natively, so we just
1169/// emit the operands verbatim. PostgreSQL is strict-typed and `->>`/`#>>`
1170/// projections return `text`; comparing or arithmetic-combining text with
1171/// numeric or boolean literals raises `operator does not exist` at runtime,
1172/// so we cast based on the literal side's type:
1173///
1174/// - `Eq`/`Neq` against `Bool(b)` → emit `'true'`/`'false'` text literal so
1175///   the JSON-text projection compares string-to-string.
1176/// - `Eq`/`Neq` against `Int`/`Decimal` → cast the non-literal side to
1177///   `numeric`.
1178/// - Numeric ops (`Lt`/`Lte`/`Gt`/`Gte`/`Add`/`Sub`/`Mul`/`Div`) → cast
1179///   non-literal sides to `numeric`. Numeric literals stay bare.
1180/// - `And`/`Or` → cast each side to `boolean` so JSON-text-projected boolean
1181///   paths participate in three-valued logic.
1182fn lower_binop_dialect(
1183    op: BinOp,
1184    lhs: &SqlExpr,
1185    rhs: &SqlExpr,
1186    ctx: &mut ExprCtx<'_>,
1187) -> Result<String, SofError> {
1188    if ctx.dialect.name() != "postgres" {
1189        let l = lower_expr(lhs, ctx)?;
1190        let r = lower_expr(rhs, ctx)?;
1191        return Ok(format!("({l} {} {r})", lower_binop(op)));
1192    }
1193
1194    let op_sql = lower_binop(op);
1195
1196    match op {
1197        BinOp::Eq | BinOp::Neq => {
1198            // Boolean literal on either side → compare as text against
1199            // `'true'`/`'false'` so the JSON `->>` projection lines up.
1200            if let Some(b) = bool_literal(rhs) {
1201                let l = lower_expr(lhs, ctx)?;
1202                let lit = if b { "'true'" } else { "'false'" };
1203                return Ok(format!("({l} {op_sql} {lit})"));
1204            }
1205            if let Some(b) = bool_literal(lhs) {
1206                let r = lower_expr(rhs, ctx)?;
1207                let lit = if b { "'true'" } else { "'false'" };
1208                return Ok(format!("({lit} {op_sql} {r})"));
1209            }
1210
1211            // Numeric literal on either side → cast the other side to numeric.
1212            if is_numeric_literal(rhs) {
1213                let l = lower_expr(lhs, ctx)?;
1214                let r = lower_expr(rhs, ctx)?;
1215                return Ok(format!("({} {op_sql} {r})", cast_pg_numeric(lhs, &l)));
1216            }
1217            if is_numeric_literal(lhs) {
1218                let l = lower_expr(lhs, ctx)?;
1219                let r = lower_expr(rhs, ctx)?;
1220                return Ok(format!("({l} {op_sql} {})", cast_pg_numeric(rhs, &r)));
1221            }
1222
1223            // Default: text-vs-text comparison (covers JsonPath = JsonPath
1224            // and JsonPath = string literal/param).
1225            let l = lower_expr(lhs, ctx)?;
1226            let r = lower_expr(rhs, ctx)?;
1227            Ok(format!("({l} {op_sql} {r})"))
1228        }
1229        BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
1230            let l = lower_expr(lhs, ctx)?;
1231            let r = lower_expr(rhs, ctx)?;
1232            Ok(format!(
1233                "({} {op_sql} {})",
1234                cast_pg_numeric(lhs, &l),
1235                cast_pg_numeric(rhs, &r)
1236            ))
1237        }
1238        BinOp::Add | BinOp::Sub | BinOp::Mul | BinOp::Div => {
1239            let l = lower_expr(lhs, ctx)?;
1240            let r = lower_expr(rhs, ctx)?;
1241            Ok(format!(
1242                "({} {op_sql} {})",
1243                cast_pg_numeric(lhs, &l),
1244                cast_pg_numeric(rhs, &r)
1245            ))
1246        }
1247        BinOp::And | BinOp::Or => {
1248            // FHIRPath text-projected booleans need an explicit `::boolean`
1249            // cast for `AND`/`OR` to type-check in PG. Bare boolean
1250            // sub-expressions (`x IS NOT NULL`, comparisons) cast cheaply.
1251            let l = lower_expr(lhs, ctx)?;
1252            let r = lower_expr(rhs, ctx)?;
1253            Ok(format!("(({l})::boolean {op_sql} ({r})::boolean)"))
1254        }
1255        BinOp::Concat | BinOp::Like | BinOp::RegexMatch => {
1256            let l = lower_expr(lhs, ctx)?;
1257            let r = lower_expr(rhs, ctx)?;
1258            Ok(format!("({l} {op_sql} {r})"))
1259        }
1260    }
1261}
1262
1263fn bool_literal(e: &SqlExpr) -> Option<bool> {
1264    match e {
1265        SqlExpr::Lit(LitValue::Bool(b)) => Some(*b),
1266        _ => None,
1267    }
1268}
1269
1270fn is_numeric_literal(e: &SqlExpr) -> bool {
1271    matches!(
1272        e,
1273        SqlExpr::Lit(LitValue::Int(_)) | SqlExpr::Lit(LitValue::Decimal(_))
1274    )
1275}
1276
1277/// Wraps a PG sub-expression with a `::numeric` cast.
1278///
1279/// `SqlExpr::Param(_)` and `SqlExpr::Lit(Str)` get a redundant `::text` cast
1280/// first. Reason: PG resolves `$N::numeric` eagerly when planning the
1281/// statement and pins the parameter type to `numeric`. `tokio_postgres` then
1282/// reports `numeric` to the binder, which fails because constants are bound
1283/// as text strings (see `PgParam::Bool`/`Int`/`Decimal`). The intermediate
1284/// `::text` keeps the parameter inferred as text; the runtime `text →
1285/// numeric` cast still works for any numeric-string input.
1286///
1287/// Numeric literals (`Int`/`Decimal`) skip the cast — they're already typed.
1288fn cast_pg_numeric(expr: &SqlExpr, lowered: &str) -> String {
1289    if is_numeric_literal(expr) {
1290        return lowered.to_string();
1291    }
1292    if matches!(expr, SqlExpr::Param(_) | SqlExpr::Lit(LitValue::Str(_))) {
1293        return format!("({lowered}::text)::numeric");
1294    }
1295    format!("({lowered})::numeric")
1296}
1297
1298// ============================================================================
1299// Helpers
1300// ============================================================================
1301
1302/// Emits the SQLite `json_each(...)` source clause for a lateral unnest.
1303///
1304/// `r.data`-rooted simple paths use the two-arg `json_each(r.data, '$.path')`
1305/// shortcut. Intermediate paths (off `<alias>.value`) wrap with a type guard:
1306/// arrays iterate; non-array singletons (FHIR singleton elements like
1307/// `contact.name`) wrap in a single-element array; missing intermediates
1308/// produce zero rows.
1309fn emit_sqlite_unnest_source(source: &SqlExpr) -> String {
1310    if let SqlExpr::JsonPath { root, path } = source {
1311        let segments_owned: Vec<String> = path
1312            .0
1313            .iter()
1314            .filter_map(|s| match s {
1315                PathStep::Field(n) => Some(n.clone()),
1316                PathStep::Index(n) => Some(n.to_string()),
1317                _ => None,
1318            })
1319            .collect();
1320        let segments: Vec<&str> = segments_owned.iter().map(String::as_str).collect();
1321        let path_step_count = path
1322            .0
1323            .iter()
1324            .filter(|s| matches!(s, PathStep::Field(_) | PathStep::Index(_)))
1325            .count();
1326        if segments.len() == path_step_count && !segments.is_empty() {
1327            // Build SQLite JSON path syntax — numeric segments are array
1328            // indices `[N]`, others are dotted fields.
1329            let mut path_str = String::from("$");
1330            for s in &segments {
1331                if s.chars().all(|c| c.is_ascii_digit()) {
1332                    path_str.push('[');
1333                    path_str.push_str(s);
1334                    path_str.push(']');
1335                } else {
1336                    path_str.push('.');
1337                    path_str.push_str(s);
1338                }
1339            }
1340            // Has the path crossed an explicit index? Indexed paths
1341            // (`telecom[0]`) always select a single element (an object) and
1342            // need the type-guard so json_each wraps the singleton in an
1343            // array rather than iterating its keys. Non-indexed `r.data`
1344            // paths are typically arrays — keep the cheaper two-arg form
1345            // for back-compat with existing test assertions.
1346            let has_index = path.0.iter().any(|s| matches!(s, PathStep::Index(_)));
1347            if root == "r.data" && !has_index {
1348                return format!("json_each({root}, '{path_str}')");
1349            }
1350            let extracted = format!("json_extract({root}, '{path_str}')");
1351            let type_check = format!("json_type({root}, '{path_str}')");
1352            // For non-array values, wrap with `json_array(json(<extract>))`
1353            // — `json(...)` re-parses the extracted text so the wrapped
1354            // value preserves its JSON shape (otherwise SQLite's `json_array`
1355            // sees a TEXT argument and JSON-quotes it as a string, which
1356            // would iterate as one stringified row rather than the original
1357            // object).
1358            return format!(
1359                "json_each(CASE WHEN {type_check} = 'array' THEN {extracted} \
1360                 WHEN {type_check} IN ('object', 'array') THEN json_array(json({extracted})) \
1361                 WHEN {type_check} IS NOT NULL THEN json_array({extracted}) \
1362                 ELSE '[]' END)"
1363            );
1364        }
1365    }
1366    let mut ctx = ExprCtx::new(&super::dialect::SqliteDialect, 3);
1367    let computed = lower_expr(source, &mut ctx).unwrap_or_else(|_| "NULL".to_string());
1368    format!("json_each(coalesce({computed}, '[]'))")
1369}
1370
1371/// Emits the PostgreSQL JSON-valued navigation expression that becomes the
1372/// argument of `jsonb_array_elements(...)`. Forces the `->` (JSON) operator
1373/// rather than the `->>` (text) operator that scalar-projection paths use.
1374///
1375/// Wraps the result in a `jsonb_typeof`-based guard that mirrors the SQLite
1376/// branch in `emit_sqlite_unnest_source`: arrays pass through; non-array
1377/// non-null values get wrapped in a single-element array (handles FHIR
1378/// singleton elements like `Patient.contact.name` that are object-shaped);
1379/// null / missing intermediates produce zero rows instead of raising at
1380/// runtime.
1381fn emit_pg_unnest_source(source: &SqlExpr) -> String {
1382    let raw = if let SqlExpr::JsonPath { root, path } = source {
1383        let segments: Vec<String> = path
1384            .0
1385            .iter()
1386            .filter_map(|s| match s {
1387                PathStep::Field(n) => Some(n.clone()),
1388                PathStep::Index(n) => Some(n.to_string()),
1389                _ => None,
1390            })
1391            .collect();
1392        if segments.is_empty() {
1393            root.clone()
1394        } else if segments.len() == 1 {
1395            format!("{root}->'{}'", segments[0])
1396        } else {
1397            format!("{root}#>'{{{}}}'", segments.join(","))
1398        }
1399    } else {
1400        // Non-`JsonPath` sources include nested `WhereScalar`/`ScalarFromChain`
1401        // results (e.g. `extension(url1).extension(url2)`). Their projections
1402        // are lowered as text via `->>`/`#>>`; an explicit `::jsonb` cast
1403        // re-parses the JSON text so the surrounding `jsonb_typeof` /
1404        // `jsonb_array_elements` operators type-check.
1405        let mut ctx = ExprCtx::new(&super::dialect::PgDialect, 3);
1406        let inner = lower_expr(source, &mut ctx).unwrap_or_else(|_| "NULL".to_string());
1407        format!("({inner})::jsonb")
1408    };
1409    format!(
1410        "(CASE WHEN jsonb_typeof({raw}) = 'array' THEN {raw} \
1411         WHEN jsonb_typeof({raw}) IS NOT NULL THEN jsonb_build_array({raw}) \
1412         ELSE '[]'::jsonb END)"
1413    )
1414}
1415
1416/// Lowers a [`SqlExpr::Boundary`] to a CASE expression. Decimal expands the
1417/// last digit by ±0.5; date/dateTime/time pad with the first/last instant of
1418/// the largest unspecified unit. The expressions match the SoF v2
1419/// `fn_boundary` conformance fixture's expected outputs and return NULL for
1420/// any input the function isn't defined for (e.g. `lowBoundary()` on a
1421/// dateTime column when the source is actually a Quantity).
1422///
1423/// String-form-driven so it works on both dialects, with `instr` /
1424/// `GLOB`-style operations switched per dialect (PG has neither builtin).
1425fn lower_boundary(
1426    side: BoundarySide,
1427    kind: BoundaryKind,
1428    src: &str,
1429    dialect: &dyn Dialect,
1430) -> String {
1431    let is_sqlite = dialect.lateral_keyword().is_empty();
1432    // SQLite uses `instr(haystack, needle)` (1-based, 0 when not found);
1433    // PG uses `position(needle in haystack)` with the same 1-based / 0
1434    // semantics. Both return integer; the surrounding CASE handles 0.
1435    let dot_pos = if is_sqlite {
1436        format!("instr({src}, '.')")
1437    } else {
1438        format!("position('.' in {src})")
1439    };
1440    // Detect "non-numeric" input. SQLite has `GLOB '*[A-Za-z]*'`; PG uses
1441    // POSIX regex `~`. Both return boolean.
1442    let alpha_check = if is_sqlite {
1443        format!("({src}) || '' GLOB '*[A-Za-z]*'")
1444    } else {
1445        format!("({src})::text ~ '[A-Za-z]'")
1446    };
1447    match kind {
1448        BoundaryKind::Decimal => {
1449            // The text projection is JSON: numbers like `1.0` or `1`.
1450            // Treat NULL/non-numeric input as NULL.
1451            //
1452            //   precision = digits after `.` in the source string
1453            //   delta     = 0.5 * 10^-precision
1454            //   low / high = value ∓ delta
1455            let len_after_dot = format!(
1456                "(length({src}) - CASE WHEN {dot_pos} = 0 \
1457                                       THEN length({src}) \
1458                                       ELSE {dot_pos} END)"
1459            );
1460            // delta = 0.5 / 10^precision = 5 * 10^(-precision-1)
1461            // Compute as `0.5 / power10(precision)` with `power(10, n)` (PG)
1462            // or `1.0 * exp(...)` (SQLite has no `power` by default — use
1463            // `(1.0 * substr('1.0', ...))` trick? Cleaner: emit a CASE on
1464            // the small set of precisions actually exercised. The corpus
1465            // uses precision 1 only, so dispatch on `len_after_dot`).
1466            let half_step = format!(
1467                "CASE {len_after_dot} \
1468                   WHEN 0 THEN 0.5 \
1469                   WHEN 1 THEN 0.05 \
1470                   WHEN 2 THEN 0.005 \
1471                   WHEN 3 THEN 0.0005 \
1472                   WHEN 4 THEN 0.00005 \
1473                   WHEN 5 THEN 0.000005 \
1474                   WHEN 6 THEN 0.0000005 \
1475                   ELSE 0.00000005 END"
1476            );
1477            let op = match side {
1478                BoundarySide::Low => "-",
1479                BoundarySide::High => "+",
1480            };
1481            // PG strict-typed: text projection must be cast to numeric for
1482            // arithmetic. SQLite happily coerces.
1483            let numeric_src = if is_sqlite {
1484                format!("({src})")
1485            } else {
1486                format!("({src})::numeric")
1487            };
1488            // Wrap in CASE so non-numeric inputs (e.g. a date string) yield
1489            // NULL rather than an error.
1490            format!(
1491                "CASE WHEN {src} IS NULL THEN NULL \
1492                 WHEN {alpha_check} THEN NULL \
1493                 ELSE {numeric_src} {op} {half_step} END"
1494            )
1495        }
1496        BoundaryKind::Date => {
1497            // Pad year/month-only dates to first/last day of that period.
1498            let pad_month_only = match side {
1499                BoundarySide::Low => "'-01-01'",
1500                BoundarySide::High => "'-12-31'",
1501            };
1502            let day_pad = match side {
1503                BoundarySide::Low => "'-01'".to_string(),
1504                BoundarySide::High => format!(
1505                    "'-' || CASE substr({src}, 6, 2) \
1506                       WHEN '02' THEN '28' \
1507                       WHEN '04' THEN '30' \
1508                       WHEN '06' THEN '30' \
1509                       WHEN '09' THEN '30' \
1510                       WHEN '11' THEN '30' \
1511                       ELSE '31' END"
1512                ),
1513            };
1514            format!(
1515                "CASE \
1516                   WHEN {src} IS NULL THEN NULL \
1517                   WHEN length({src}) = 10 THEN {src} \
1518                   WHEN length({src}) = 7 THEN {src} || {day_pad} \
1519                   WHEN length({src}) = 4 THEN {src} || {pad_month_only} \
1520                   ELSE NULL END"
1521            )
1522        }
1523        BoundaryKind::DateTime => {
1524            // SoF v2 PR FHIR/sql-on-fhir-v2#357: a column whose type is
1525            // `dateTime` may carry results from either a `date` or a
1526            // `dateTime` source. FHIRPath `lowBoundary()`/`highBoundary()`
1527            // preserves the source's precision, so the SQL emit dispatches
1528            // on input length:
1529            //
1530            //   length 4  ("YYYY")        → date semantics: pad to "YYYY-01-01"
1531            //                                                 or "YYYY-12-31"
1532            //   length 7  ("YYYY-MM")     → date semantics: pad to month start
1533            //                                                 or last day of month
1534            //   length 10 ("YYYY-MM-DD")  → datetime semantics: append
1535            //                                "T00:00:00.000+14:00" (low)
1536            //                                or "T23:59:59.999-12:00" (high)
1537            //
1538            // Anything else (full datetime already present, malformed) returns
1539            // NULL — matches the BoundaryKind::Date emit's behavior for
1540            // off-spec inputs.
1541            let pad_full_day = match side {
1542                BoundarySide::Low => "'T00:00:00.000+14:00'",
1543                BoundarySide::High => "'T23:59:59.999-12:00'",
1544            };
1545            let pad_month_only = match side {
1546                BoundarySide::Low => "'-01-01'",
1547                BoundarySide::High => "'-12-31'",
1548            };
1549            let day_pad = match side {
1550                BoundarySide::Low => "'-01'".to_string(),
1551                BoundarySide::High => format!(
1552                    "'-' || CASE substr({src}, 6, 2) \
1553                       WHEN '02' THEN '28' \
1554                       WHEN '04' THEN '30' \
1555                       WHEN '06' THEN '30' \
1556                       WHEN '09' THEN '30' \
1557                       WHEN '11' THEN '30' \
1558                       ELSE '31' END"
1559                ),
1560            };
1561            format!(
1562                "CASE \
1563                   WHEN {src} IS NULL THEN NULL \
1564                   WHEN length({src}) = 10 THEN {src} || {pad_full_day} \
1565                   WHEN length({src}) = 7 THEN {src} || {day_pad} \
1566                   WHEN length({src}) = 4 THEN {src} || {pad_month_only} \
1567                   ELSE NULL END"
1568            )
1569        }
1570        BoundaryKind::Time => {
1571            let pad = match side {
1572                BoundarySide::Low => "':00.000'",
1573                BoundarySide::High => "':59.999'",
1574            };
1575            format!(
1576                "CASE \
1577                   WHEN {src} IS NULL THEN NULL \
1578                   WHEN length({src}) = 5 THEN {src} || {pad} \
1579                   ELSE NULL END"
1580            )
1581        }
1582    }
1583}
1584
1585/// Strips a trailing `ORDER BY …` clause (case-insensitive). Used when
1586/// assembling UNION ALL branches — `ORDER BY` inside an individual compound
1587/// SELECT term is not portable.
1588fn strip_trailing_order_by(sql: &str) -> &str {
1589    let upper = sql.to_ascii_uppercase();
1590    if let Some(pos) = upper.rfind("\nORDER BY") {
1591        &sql[..pos]
1592    } else if let Some(pos) = upper.rfind(" ORDER BY") {
1593        &sql[..pos]
1594    } else {
1595        sql
1596    }
1597}
1598
1599/// Rejects identifiers that would break SQL `AS "…"` quoting. Per the SoF v2
1600/// spec column names are restricted to identifier characters, so this is a
1601/// safety net for malformed input rather than a deliberate escape.
1602fn sanitize_ident(name: &str) -> Result<&str, SofError> {
1603    if name.contains('"') || name.contains('\0') {
1604        return Err(SofError::InvalidViewDefinition(format!(
1605            "column name '{name}' contains an unsupported character"
1606        )));
1607    }
1608    Ok(name)
1609}
1610
1611// Unused JsonType import-warning silencer: variants are referenced inside
1612// PathStep::TypeFilter pattern matches that get exercised in later stages.
1613const _: Option<JsonType> = None;