1use crate::core::sof_runner::SofError;
14
15use super::dialect::Dialect;
16use super::ir::{
17 BinOp, BoundaryKind, BoundarySide, JsonPath, JsonType, LitValue, PathStep, PlanNode, SqlExpr,
18 SqlType, UnaryOp,
19};
20
21#[derive(Debug, Clone)]
23pub struct EmittedSql {
24 pub sql: String,
26 pub columns: Vec<String>,
29 pub next_param_index: usize,
32}
33
34pub fn emit_plan(plan: &PlanNode, dialect: &dyn Dialect) -> Result<EmittedSql, SofError> {
42 match plan {
43 PlanNode::Union(branches) => emit_union(branches, dialect),
44 PlanNode::Project { parent, .. } if contains_recurse(parent) => {
45 emit_recurse_select(plan, dialect)
46 }
47 _ => emit_select(plan, dialect, true),
48 }
49}
50
51fn contains_recurse(node: &PlanNode) -> bool {
55 match node {
56 PlanNode::Recurse { .. } => true,
57 PlanNode::LateralUnnest { parent, .. } | PlanNode::Filter { parent, .. } => {
58 contains_recurse(parent)
59 }
60 _ => false,
61 }
62}
63
64fn emit_select(
70 plan: &PlanNode,
71 dialect: &dyn Dialect,
72 with_tenant_predicate: bool,
73) -> Result<EmittedSql, SofError> {
74 let (project_cols, body) = match plan {
76 PlanNode::Project { parent, columns } => (columns.as_slice(), parent.as_ref()),
77 _ => {
78 return Err(SofError::InvalidViewDefinition(
79 "plan tree must have a Project node at the top".to_string(),
80 ));
81 }
82 };
83
84 let mut frame = Frame::new();
86 walk_body(body, dialect, &mut frame)?;
87
88 let scan = frame
89 .scan
90 .as_ref()
91 .ok_or_else(|| SofError::InvalidViewDefinition("plan has no Scan node".to_string()))?;
92
93 let mut select_parts: Vec<String> = Vec::with_capacity(project_cols.len());
95 let mut columns: Vec<String> = Vec::with_capacity(project_cols.len());
96 for col in project_cols {
97 if col.collection {
98 return Err(SofError::Uncompilable {
99 reason: "column.collection=true is not yet supported by the in-DB runner"
100 .to_string(),
101 });
102 }
103 let mut expr_ctx = ExprCtx::new(dialect, frame.next_param);
104 let expr_sql = lower_expr(&col.expr, &mut expr_ctx)?;
105 frame.next_param = expr_ctx.next_param;
106 let casted = match col.ty {
107 SqlType::Text => project_text(&col.expr, &expr_sql, dialect),
113 other => dialect.cast(&expr_sql, other),
114 };
115 select_parts.push(format!("{casted} AS \"{}\"", sanitize_ident(&col.name)?));
116 columns.push(col.name.clone());
117 }
118
119 if select_parts.is_empty() {
120 return Err(SofError::InvalidViewDefinition(
121 "no output columns".to_string(),
122 ));
123 }
124 let select_clause = select_parts.join(",\n ");
125
126 let mut from_clause = format!("{} r", scan.table);
129 for join in &frame.joins {
130 from_clause.push('\n');
131 from_clause.push_str(&join.sql);
132 }
133
134 let mut where_parts: Vec<String> = Vec::new();
136 if with_tenant_predicate {
137 where_parts.push(format!(
138 "r.tenant_id = {}\n AND r.resource_type = {}\n AND r.is_deleted = {}",
139 dialect.placeholder(1),
140 dialect.placeholder(2),
141 dialect.bool_false()
142 ));
143 }
144 for pred in &frame.predicates {
145 where_parts.push(pred.clone());
146 }
147 let where_clause = where_parts.join("\n AND ");
148
149 let sql = format!(
150 "SELECT\n {select_clause}\nFROM {from_clause}\nWHERE {where_clause}\nORDER BY r.last_updated, r.id"
151 );
152
153 Ok(EmittedSql {
154 sql,
155 columns,
156 next_param_index: frame.next_param,
157 })
158}
159
160fn emit_recurse_select(plan: &PlanNode, dialect: &dyn Dialect) -> Result<EmittedSql, SofError> {
165 let (project_cols, body) = match plan {
166 PlanNode::Project { parent, columns } => (columns.as_slice(), parent.as_ref()),
167 _ => unreachable!("emit_recurse_select called on non-Project plan"),
168 };
169
170 let mut extra_unnests: Vec<&PlanNode> = Vec::new();
174 let mut cur = body;
175 while let PlanNode::LateralUnnest { parent, .. } = cur {
176 extra_unnests.push(cur);
177 cur = parent.as_ref();
178 }
179 let recurse_node = cur;
180 let (parent_plan, step_paths, out_alias) = match recurse_node {
181 PlanNode::Recurse {
182 parent,
183 step_paths,
184 out_alias,
185 ..
186 } => (parent.as_ref(), step_paths.as_slice(), out_alias.as_str()),
187 _ => unreachable!("emit_recurse_select called with non-Recurse parent"),
188 };
189
190 let mut frame = Frame::new();
194 walk_body(parent_plan, dialect, &mut frame)?;
195 let scan = frame
196 .scan
197 .as_ref()
198 .ok_or_else(|| SofError::InvalidViewDefinition("plan has no Scan node".to_string()))?;
199
200 let tenant_pred = format!(
202 "r.tenant_id = {}\n AND r.resource_type = {}\n AND r.is_deleted = {}",
203 dialect.placeholder(1),
204 dialect.placeholder(2),
205 dialect.bool_false()
206 );
207 let mut where_pred = tenant_pred.clone();
208 for p in &frame.predicates {
209 where_pred.push_str("\n AND ");
210 where_pred.push_str(p);
211 }
212
213 let mut seed_branches: Vec<String> = Vec::with_capacity(step_paths.len());
215 let mut step_branches: Vec<String> = Vec::with_capacity(step_paths.len());
216 for path in step_paths {
217 let src = SqlExpr::JsonPath {
218 root: "r.data".to_string(),
219 path: path.clone(),
220 };
221 let unnest = if dialect.lateral_keyword().is_empty() {
222 format!("{} je", emit_sqlite_unnest_source(&src))
223 } else {
224 format!(
225 "JOIN {}{} AS je(value) ON TRUE",
226 dialect.lateral_keyword(),
227 dialect.unnest_array(&emit_pg_unnest_source(&src))
228 )
229 };
230 let branch = if dialect.lateral_keyword().is_empty() {
231 format!(
232 "SELECT r.id AS rid, je.value AS node\n FROM {} r, {}\n WHERE {}",
233 scan.table, unnest, where_pred
234 )
235 } else {
236 format!(
237 "SELECT r.id AS rid, je.value AS node\n FROM {} r {}\n WHERE {}",
238 scan.table, unnest, where_pred
239 )
240 };
241 seed_branches.push(branch);
242 }
243
244 let is_pg_dialect = !dialect.lateral_keyword().is_empty();
253 let mut pg_lateral_branches: Vec<String> = Vec::new();
254 for path in step_paths {
255 let segs: Vec<&str> = path
256 .0
257 .iter()
258 .filter_map(|s| match s {
259 PathStep::Field(n) => Some(n.as_str()),
260 _ => None,
261 })
262 .collect();
263 if segs.is_empty() {
264 continue;
265 }
266 let mut prev_root = format!("{out_alias}.node");
267 let mut from_parts: Vec<String> = Vec::new();
268 for (i, field) in segs.iter().enumerate() {
269 let alias = format!("rs{i}");
270 let src = SqlExpr::JsonPath {
271 root: prev_root.clone(),
272 path: super::ir::JsonPath(vec![PathStep::Field((*field).to_string())]),
273 };
274 if dialect.lateral_keyword().is_empty() {
275 from_parts.push(format!("{} {alias}", emit_sqlite_unnest_source(&src)));
276 } else {
277 from_parts.push(format!(
278 "{}{} AS {alias}(value)",
279 dialect.lateral_keyword(),
280 dialect.unnest_array(&emit_pg_unnest_source(&src))
281 ));
282 }
283 prev_root = format!("{alias}.value");
284 }
285 let leaf_alias = format!("rs{}", segs.len() - 1);
286 if is_pg_dialect && step_paths.len() > 1 {
287 let mut from_clause = String::new();
290 for (i, fp) in from_parts.iter().enumerate() {
291 if i == 0 {
292 from_clause.push_str(fp);
293 } else {
294 from_clause.push_str(" JOIN ");
295 from_clause.push_str(fp);
296 from_clause.push_str(" ON TRUE");
297 }
298 }
299 pg_lateral_branches.push(format!("SELECT {leaf_alias}.value FROM {from_clause}"));
300 } else {
301 let from_clause = if dialect.lateral_keyword().is_empty() {
302 format!("{out_alias}, {}", from_parts.join(", "))
303 } else {
304 let mut s = out_alias.to_string();
305 for fp in &from_parts {
306 s.push_str(" JOIN ");
307 s.push_str(fp);
308 s.push_str(" ON TRUE");
309 }
310 s
311 };
312 step_branches.push(format!(
313 "SELECT {out_alias}.rid, {leaf_alias}.value AS node\n FROM {from_clause}"
314 ));
315 }
316 }
317 if is_pg_dialect && !pg_lateral_branches.is_empty() {
318 let unioned = pg_lateral_branches.join("\n UNION ALL\n ");
321 step_branches.push(format!(
322 "SELECT {out_alias}.rid, _step.value AS node\n \
323 FROM {out_alias}, LATERAL ({unioned}) AS _step(value)"
324 ));
325 }
326
327 let is_pg = !dialect.lateral_keyword().is_empty();
332 let cte_body = if is_pg && (seed_branches.len() > 1 || step_branches.len() > 1) {
333 let seeds = if seed_branches.len() == 1 {
334 seed_branches.remove(0)
335 } else {
336 format!("({})", seed_branches.join("\n UNION ALL\n "))
337 };
338 let steps = if step_branches.is_empty() {
339 String::new()
340 } else if step_branches.len() == 1 {
341 step_branches.remove(0)
342 } else {
343 format!("({})", step_branches.join("\n UNION ALL\n "))
344 };
345 if steps.is_empty() {
346 seeds
347 } else {
348 format!("{seeds}\n UNION ALL\n {steps}")
349 }
350 } else {
351 let mut all = seed_branches;
352 all.extend(step_branches);
353 all.join("\n UNION ALL\n ")
354 };
355
356 let needs_resource_join = project_cols
359 .iter()
360 .any(|c| column_refers_to_resource(&c.expr));
361
362 let mut select_parts: Vec<String> = Vec::with_capacity(project_cols.len());
364 let mut columns: Vec<String> = Vec::with_capacity(project_cols.len());
365 for col in project_cols {
366 if col.collection {
367 return Err(SofError::Uncompilable {
368 reason: "column.collection=true is not yet supported by the in-DB runner"
369 .to_string(),
370 });
371 }
372 let mut ctx = ExprCtx::new(dialect, frame.next_param);
373 let expr_sql = lower_expr(&col.expr, &mut ctx)?;
374 frame.next_param = ctx.next_param;
375 let casted = match col.ty {
376 SqlType::Text => project_text(&col.expr, &expr_sql, dialect),
377 other => dialect.cast(&expr_sql, other),
378 };
379 select_parts.push(format!("{casted} AS \"{}\"", sanitize_ident(&col.name)?));
380 columns.push(col.name.clone());
381 }
382
383 let mut from_clause = if needs_resource_join {
384 format!(
385 "{} JOIN {} r ON r.id = {}.rid AND {}",
386 out_alias, scan.table, out_alias, tenant_pred
387 )
388 } else {
389 out_alias.to_string()
390 };
391
392 for layer in extra_unnests.iter().rev() {
397 if let PlanNode::LateralUnnest {
398 source,
399 out_alias: alias,
400 left_join,
401 on_filter,
402 ..
403 } = layer
404 {
405 let join_kw = if *left_join { "LEFT JOIN" } else { "JOIN" };
406 let extra_on = if let Some(filter) = on_filter {
407 let mut ctx = ExprCtx::new(dialect, frame.next_param);
408 let s = lower_expr(filter, &mut ctx)?;
409 frame.next_param = ctx.next_param;
410 Some(s)
411 } else {
412 None
413 };
414 if dialect.lateral_keyword().is_empty() {
415 let source_sql = emit_sqlite_unnest_source(source);
416 let on = match &extra_on {
417 Some(f) => format!("1=1 AND {f}"),
418 None => "1=1".to_string(),
419 };
420 from_clause.push('\n');
421 from_clause.push_str(&format!("{join_kw} {source_sql} {alias} ON {on}"));
422 } else {
423 let source_sql = emit_pg_unnest_source(source);
424 let unnest = dialect.unnest_array(&source_sql);
425 let on = match &extra_on {
426 Some(f) => format!("TRUE AND {f}"),
427 None => "TRUE".to_string(),
428 };
429 from_clause.push('\n');
430 from_clause.push_str(&format!(
431 "{join_kw} {}{} AS {alias}(value) ON {on}",
432 dialect.lateral_keyword(),
433 unnest
434 ));
435 }
436 }
437 }
438
439 let sql = format!(
440 "WITH RECURSIVE {out_alias}(rid, node) AS (\n {cte_body}\n)\nSELECT\n {}\nFROM {from_clause}\nORDER BY 1",
441 select_parts.join(",\n ")
442 );
443
444 Ok(EmittedSql {
445 sql,
446 columns,
447 next_param_index: frame.next_param,
448 })
449}
450
451fn column_refers_to_resource(expr: &SqlExpr) -> bool {
455 match expr {
456 SqlExpr::JsonPath { root, .. } => root == "r.data" || root.starts_with("r.data"),
457 SqlExpr::Cast { inner, .. }
458 | SqlExpr::UnaryOp { inner, .. }
459 | SqlExpr::AsJson(inner)
460 | SqlExpr::Alias { inner, .. } => column_refers_to_resource(inner),
461 SqlExpr::BinOp { lhs, rhs, .. } => {
462 column_refers_to_resource(lhs) || column_refers_to_resource(rhs)
463 }
464 SqlExpr::Case { arms, else_ } => {
465 arms.iter()
466 .any(|(c, v)| column_refers_to_resource(c) || column_refers_to_resource(v))
467 || else_.as_deref().is_some_and(column_refers_to_resource)
468 }
469 SqlExpr::Coalesce(parts) => parts.iter().any(column_refers_to_resource),
470 SqlExpr::NullIf(a, b) => column_refers_to_resource(a) || column_refers_to_resource(b),
471 SqlExpr::ReferenceKey { reference, .. } => column_refers_to_resource(reference),
472 SqlExpr::Boundary { source, .. } => column_refers_to_resource(source),
473 _ => false,
474 }
475}
476
477fn emit_union(branches: &[PlanNode], dialect: &dyn Dialect) -> Result<EmittedSql, SofError> {
481 if branches.is_empty() {
482 return Err(SofError::InvalidViewDefinition(
483 "unionAll branches list is empty".to_string(),
484 ));
485 }
486
487 let mut branch_sqls: Vec<String> = Vec::with_capacity(branches.len());
488 let mut columns: Option<Vec<String>> = None;
489 let mut next_param = 3usize;
490
491 for branch in branches {
492 let emitted = emit_plan(branch, dialect)?;
493
494 match &columns {
495 None => columns = Some(emitted.columns.clone()),
496 Some(expected) if *expected != emitted.columns => {
497 return Err(SofError::Uncompilable {
498 reason: format!(
499 "unionAll branches produce different column schemas: {:?} vs {:?}",
500 expected, emitted.columns
501 ),
502 });
503 }
504 _ => {}
505 }
506
507 next_param = next_param.max(emitted.next_param_index);
508 let body = strip_trailing_order_by(&emitted.sql).to_string();
513 let needs_wrap = body.trim_start().starts_with("WITH");
514 if needs_wrap {
515 let alias = format!("_recurse_{}", branch_sqls.len());
519 branch_sqls.push(format!("SELECT * FROM ({body}) AS {alias}"));
520 } else {
521 branch_sqls.push(body);
522 }
523 }
524
525 let sql = format!("{}\nORDER BY 1", branch_sqls.join("\nUNION ALL\n"));
526 Ok(EmittedSql {
527 sql,
528 columns: columns.unwrap_or_default(),
529 next_param_index: next_param,
530 })
531}
532
533#[derive(Debug)]
538struct Frame {
539 scan: Option<ScanInfo>,
540 joins: Vec<JoinClause>,
542 predicates: Vec<String>,
544 next_param: usize,
548}
549
550#[derive(Debug)]
551struct ScanInfo {
552 table: &'static str,
553}
554
555#[derive(Debug)]
556struct JoinClause {
557 sql: String,
558}
559
560impl Frame {
561 fn new() -> Self {
562 Self {
563 scan: None,
564 joins: Vec::new(),
565 predicates: Vec::new(),
566 next_param: 3,
568 }
569 }
570}
571
572fn walk_body(node: &PlanNode, dialect: &dyn Dialect, frame: &mut Frame) -> Result<(), SofError> {
575 match node {
576 PlanNode::Scan { alias, .. } => {
577 if alias != "r" {
578 return Err(SofError::Uncompilable {
579 reason: format!("Scan alias must be 'r' in current emitter (got '{alias}')"),
580 });
581 }
582 frame.scan = Some(ScanInfo { table: "resources" });
583 Ok(())
584 }
585 PlanNode::Filter { parent, predicate } => {
586 walk_body(parent, dialect, frame)?;
587 let mut ctx = ExprCtx::new(dialect, frame.next_param);
588 let pred_sql = lower_expr(predicate, &mut ctx)?;
589 frame.next_param = ctx.next_param;
590 frame.predicates.push(dialect.truthy_predicate(&pred_sql));
594 Ok(())
595 }
596 PlanNode::LateralUnnest {
597 parent,
598 source,
599 out_alias,
600 left_join,
601 on_filter,
602 flat_index,
603 } => {
604 walk_body(parent, dialect, frame)?;
605 let join_kw = if *left_join { "LEFT JOIN" } else { "JOIN" };
606 let lateral = dialect.lateral_keyword();
607 let extra_on = if let Some(filter) = on_filter {
610 let mut ctx = ExprCtx::new(dialect, frame.next_param);
611 let sql = lower_expr(filter, &mut ctx)?;
612 frame.next_param = ctx.next_param;
613 Some(sql)
614 } else {
615 None
616 };
617 let join_sql = if lateral.is_empty() {
618 let source_sql = emit_sqlite_unnest_source(source);
622 let on = match &extra_on {
623 Some(f) => format!("1=1 AND {f}"),
624 None => "1=1".to_string(),
625 };
626 if let Some(idx) = flat_index {
627 let inner = format!("{out_alias}_src");
633 let prior = std::mem::take(&mut frame.joins);
634 let prior_sources: Vec<String> = prior
635 .iter()
636 .map(|j| {
637 j.sql
638 .strip_prefix("JOIN ")
639 .and_then(|s| s.find(" ON ").map(|i| s[..i].to_string()))
640 .unwrap_or_else(|| j.sql.clone())
641 })
642 .collect();
643 let from_chain = if prior_sources.is_empty() {
644 format!("{source_sql} {inner}")
645 } else {
646 format!("{}, {source_sql} {inner}", prior_sources.join(", "))
647 };
648 format!(
649 "{join_kw} (SELECT {inner}.value AS value FROM {from_chain} \
650 WHERE {on} LIMIT 1 OFFSET {idx}) {out_alias} ON 1=1"
651 )
652 } else {
653 format!("{join_kw} {source_sql} {out_alias} ON {on}")
654 }
655 } else {
656 let source_sql = emit_pg_unnest_source(source);
659 let unnest = dialect.unnest_array(&source_sql);
660 let on = match &extra_on {
661 Some(f) => format!("TRUE AND {f}"),
662 None => "TRUE".to_string(),
663 };
664 if let Some(idx) = flat_index {
665 format!(
666 "{join_kw} LATERAL (SELECT value FROM {unnest} AS sub(value) \
667 WHERE {on} LIMIT 1 OFFSET {idx}) AS {out_alias}(value) ON TRUE"
668 )
669 } else {
670 format!("{join_kw} {lateral}{unnest} AS {out_alias}(value) ON {on}")
671 }
672 };
673 frame.joins.push(JoinClause { sql: join_sql });
674 Ok(())
675 }
676 PlanNode::Project { .. } => Err(SofError::InvalidViewDefinition(
677 "nested Project nodes are not supported by the current emitter".to_string(),
678 )),
679 PlanNode::Union(_) => Err(SofError::InvalidViewDefinition(
680 "Union node may only appear at the top of a plan".to_string(),
681 )),
682 PlanNode::Recurse { .. } => Err(SofError::Uncompilable {
683 reason: "Recurse (repeat:) is not yet implemented in the emitter".to_string(),
684 }),
685 }
686}
687
688struct ExprCtx<'a> {
695 dialect: &'a dyn Dialect,
696 next_param: usize,
697}
698
699impl<'a> ExprCtx<'a> {
700 fn new(dialect: &'a dyn Dialect, next_param: usize) -> Self {
701 Self {
702 dialect,
703 next_param,
704 }
705 }
706}
707
708fn lower_expr(expr: &SqlExpr, ctx: &mut ExprCtx<'_>) -> Result<String, SofError> {
709 match expr {
710 SqlExpr::Lit(v) => Ok(lower_lit(v, ctx.dialect)),
711 SqlExpr::JsonPath { root, path } => Ok(lower_json_path(root, path, ctx.dialect)),
712 SqlExpr::Param(n) => Ok(ctx.dialect.placeholder(*n)),
713 SqlExpr::ColRef(name) => Ok(name.clone()),
714 SqlExpr::Cast { inner, ty } => {
715 let inner = lower_expr(inner, ctx)?;
716 Ok(ctx.dialect.cast(&inner, *ty))
717 }
718 SqlExpr::BinOp { op, lhs, rhs } => lower_binop_dialect(*op, lhs, rhs, ctx),
719 SqlExpr::UnaryOp { op, inner } => {
720 let inner = lower_expr(inner, ctx)?;
721 Ok(match op {
722 UnaryOp::Not => format!("NOT ({inner})"),
723 UnaryOp::IsNull => format!("({inner}) IS NULL"),
724 UnaryOp::IsNotNull => format!("({inner}) IS NOT NULL"),
725 UnaryOp::Neg => format!("-({inner})"),
726 })
727 }
728 SqlExpr::Case { arms, else_ } => {
729 let mut s = String::from("CASE");
730 for (cond, val) in arms {
731 let c = lower_expr(cond, ctx)?;
732 let v = lower_expr(val, ctx)?;
733 s.push_str(&format!(" WHEN {c} THEN {v}"));
734 }
735 if let Some(e) = else_ {
736 let v = lower_expr(e, ctx)?;
737 s.push_str(&format!(" ELSE {v}"));
738 }
739 s.push_str(" END");
740 Ok(s)
741 }
742 SqlExpr::Coalesce(parts) => {
743 let parts: Result<Vec<String>, _> = parts.iter().map(|p| lower_expr(p, ctx)).collect();
744 Ok(format!("coalesce({})", parts?.join(", ")))
745 }
746 SqlExpr::NullIf(a, b) => {
747 let a = lower_expr(a, ctx)?;
748 let b = lower_expr(b, ctx)?;
749 Ok(format!("nullif({a}, {b})"))
750 }
751 SqlExpr::AsJson(inner) => {
752 let inner = lower_expr(inner, ctx)?;
753 Ok(ctx.dialect.cast(&inner, SqlType::Json))
754 }
755 SqlExpr::JsonAgg(_) | SqlExpr::Scalar(_) | SqlExpr::Exists(_) | SqlExpr::CountSub(_) => {
756 Err(SofError::Uncompilable {
757 reason: "subquery-valued expressions are not yet supported by the in-DB runner"
758 .to_string(),
759 })
760 }
761 SqlExpr::Alias { inner, .. } => lower_expr(inner, ctx),
762 SqlExpr::Boundary { side, kind, source } => {
763 let src = lower_expr(source, ctx)?;
764 Ok(lower_boundary(*side, *kind, &src, ctx.dialect))
765 }
766 SqlExpr::ScalarFromChain {
767 chain_sql,
768 projection,
769 offset,
770 } => {
771 let proj_sql = lower_expr(projection, ctx)?;
772 Ok(format!(
773 "(SELECT {proj_sql} FROM {chain_sql} LIMIT 1 OFFSET {offset})"
774 ))
775 }
776 SqlExpr::CollectionAgg { root, path } => {
777 let mut field_steps: Vec<&str> = Vec::new();
778 for step in &path.0 {
779 if let PathStep::Field(name) = step {
780 field_steps.push(name.as_str());
781 }
782 }
783 if field_steps.is_empty() {
784 return Ok(format!(
785 "(SELECT {} FROM (SELECT {root} AS v) WHERE v IS NOT NULL)",
786 ctx.dialect.json_agg("v")
787 ));
788 }
789 let lateral = ctx.dialect.lateral_keyword();
796 if field_steps.len() == 1 {
797 let src = SqlExpr::JsonPath {
798 root: root.clone(),
799 path: super::ir::JsonPath(vec![PathStep::Field(field_steps[0].to_string())]),
800 };
801 let from = if lateral.is_empty() {
802 format!("{} ca0", emit_sqlite_unnest_source(&src))
803 } else {
804 format!(
805 "{}{} AS ca0(value)",
806 lateral,
807 ctx.dialect.unnest_array(&emit_pg_unnest_source(&src))
808 )
809 };
810 let agg = ctx.dialect.json_agg("ca0.value");
811 return Ok(format!("(SELECT {agg} FROM {from})"));
812 }
813 let outer_src = SqlExpr::JsonPath {
817 root: root.clone(),
818 path: super::ir::JsonPath(vec![PathStep::Field(field_steps[0].to_string())]),
819 };
820 let leaf_field = field_steps[field_steps.len() - 1];
821 let middle_fields = &field_steps[1..field_steps.len() - 1];
822 let mut leaf_path_segs: Vec<&str> = Vec::new();
825 for m in middle_fields {
826 leaf_path_segs.push(m);
827 }
828 leaf_path_segs.push(leaf_field);
829 let leaf_value_sql = if lateral.is_empty() {
830 let mut path = String::from("$");
831 for s in &leaf_path_segs {
832 path.push('.');
833 path.push_str(s);
834 }
835 format!("json_extract(ca0.value, '{path}')")
836 } else {
837 let segs = leaf_path_segs.to_vec();
838 ctx.dialect.json_path("ca0.value", &segs)
839 };
840 let outer_from = if lateral.is_empty() {
841 format!("{} ca0", emit_sqlite_unnest_source(&outer_src))
842 } else {
843 format!(
844 "{}{} AS ca0(value)",
845 lateral,
846 ctx.dialect.unnest_array(&emit_pg_unnest_source(&outer_src))
847 )
848 };
849 if lateral.is_empty() {
853 let mut leaf_path_str = String::from("$");
859 for s in &leaf_path_segs {
860 leaf_path_str.push('.');
861 leaf_path_str.push_str(s);
862 }
863 let type_check = format!("json_type(ca0.value, '{leaf_path_str}')");
864 let guarded = format!(
865 "json_each(CASE WHEN {type_check} = 'array' \
866 THEN {leaf_value_sql} \
867 ELSE json_array({leaf_value_sql}) END)"
868 );
869 let agg = ctx.dialect.json_agg("ca1.value");
870 Ok(format!(
871 "(SELECT {agg} FROM {outer_from}, {guarded} ca1 \
872 WHERE {type_check} IS NOT NULL)"
873 ))
874 } else {
875 let guarded = format!(
878 "jsonb_array_elements(\
879 CASE WHEN jsonb_typeof({leaf_value_sql}) = 'array' \
880 THEN {leaf_value_sql} \
881 ELSE jsonb_build_array({leaf_value_sql}) END)"
882 );
883 let agg = ctx.dialect.json_agg("ca1.value");
884 Ok(format!(
885 "(SELECT {agg} FROM {outer_from} \
886 JOIN LATERAL {guarded} AS ca1(value) ON TRUE \
887 WHERE {leaf_value_sql} IS NOT NULL)"
888 ))
889 }
890 }
891 SqlExpr::JoinAggregate {
892 outer_focus,
893 outer_alias,
894 inner_field,
895 inner_alias,
896 separator,
897 } => {
898 let sep_lit = format!("'{}'", separator.replace('\'', "''"));
903 let unnest_outer = if ctx.dialect.lateral_keyword().is_empty() {
904 let src = emit_sqlite_unnest_source(outer_focus);
905 format!("FROM {src} {outer_alias}")
906 } else {
907 let src = emit_pg_unnest_source(outer_focus);
908 format!(
909 "FROM {}{} AS {outer_alias}(value)",
910 ctx.dialect.lateral_keyword(),
911 ctx.dialect.unnest_array(&src)
912 )
913 };
914 let inner_src = SqlExpr::JsonPath {
915 root: format!("{outer_alias}.value"),
916 path: super::ir::JsonPath(vec![PathStep::Field(inner_field.clone())]),
917 };
918 let unnest_inner = if ctx.dialect.lateral_keyword().is_empty() {
919 let src = emit_sqlite_unnest_source(&inner_src);
920 format!(", {src} {inner_alias}")
921 } else {
922 let src = emit_pg_unnest_source(&inner_src);
923 format!(
924 " JOIN {}{} AS {inner_alias}(value) ON TRUE",
925 ctx.dialect.lateral_keyword(),
926 ctx.dialect.unnest_array(&src)
927 )
928 };
929 let value_text = if ctx.dialect.lateral_keyword().is_empty() {
930 format!("{inner_alias}.value")
931 } else {
932 format!("({inner_alias}.value #>> '{{}}')")
933 };
934 let agg = ctx.dialect.string_agg(&value_text, &sep_lit);
935 Ok(format!("(SELECT {agg} {unnest_outer}{unnest_inner})"))
939 }
940 SqlExpr::WhereScalar {
941 focus,
942 iter_alias,
943 predicate,
944 projection,
945 } => {
946 let unnest = if ctx.dialect.lateral_keyword().is_empty() {
947 let src = emit_sqlite_unnest_source(focus);
948 format!("FROM {src} {iter_alias}")
949 } else {
950 let src = emit_pg_unnest_source(focus);
951 format!(
952 "FROM {}{} AS {iter_alias}(value)",
953 ctx.dialect.lateral_keyword(),
954 ctx.dialect.unnest_array(&src)
955 )
956 };
957 let pred_sql = lower_expr(predicate, ctx)?;
958 let proj_sql = lower_expr(projection, ctx)?;
959 Ok(format!(
960 "(SELECT {proj_sql} {unnest} WHERE {pred_sql} LIMIT 1)"
961 ))
962 }
963 SqlExpr::WhereExists {
964 focus,
965 iter_alias,
966 predicate,
967 negate,
968 } => {
969 let unnest = if ctx.dialect.lateral_keyword().is_empty() {
970 let src = emit_sqlite_unnest_source(focus);
971 format!("FROM {src} {iter_alias}")
972 } else {
973 let src = emit_pg_unnest_source(focus);
974 format!(
975 "FROM {}{} AS {iter_alias}(value)",
976 ctx.dialect.lateral_keyword(),
977 ctx.dialect.unnest_array(&src)
978 )
979 };
980 let pred_sql = lower_expr(predicate, ctx)?;
981 let kw = if *negate { "NOT EXISTS" } else { "EXISTS" };
982 Ok(format!("{kw} (SELECT 1 {unnest} WHERE {pred_sql})"))
983 }
984 SqlExpr::ReferenceKey {
985 reference,
986 expected_type,
987 } => {
988 let ref_sql = lower_expr(reference, ctx)?;
989 let last = ctx.dialect.last_path_segment(&ref_sql);
990 match expected_type {
991 None => Ok(last),
992 Some(ty) => {
993 let p1 = format!("{ty}/%").replace('\'', "''");
999 let p2 = format!("%/{ty}/%").replace('\'', "''");
1000 Ok(format!(
1001 "CASE WHEN {ref_sql} LIKE '{p1}' OR {ref_sql} LIKE '{p2}' \
1002 THEN {last} ELSE NULL END"
1003 ))
1004 }
1005 }
1006 }
1007 }
1008}
1009
1010fn project_text(expr: &SqlExpr, lowered: &str, dialect: &dyn Dialect) -> String {
1022 match expr {
1023 SqlExpr::JsonPath { path, .. } if path.is_empty() => {
1024 if dialect.name() == "postgres" {
1025 format!("({lowered})#>>'{{}}'")
1026 } else {
1027 lowered.to_string()
1028 }
1029 }
1030 SqlExpr::JsonPath { .. } | SqlExpr::Lit(_) => lowered.to_string(),
1031 _ => dialect.cast(lowered, SqlType::Text),
1032 }
1033}
1034
1035fn lower_lit(v: &LitValue, dialect: &dyn Dialect) -> String {
1036 match v {
1037 LitValue::Null => "NULL".to_string(),
1038 LitValue::Bool(true) => dialect.bool_true().to_string(),
1039 LitValue::Bool(false) => dialect.bool_false().to_string(),
1040 LitValue::Int(n) => n.to_string(),
1041 LitValue::Decimal(s) => s.clone(),
1042 LitValue::Str(s) => format!("'{}'", s.replace('\'', "''")),
1045 }
1046}
1047
1048fn lower_json_path(root: &str, path: &JsonPath, dialect: &dyn Dialect) -> String {
1049 if path.is_empty() {
1050 return root.to_string();
1051 }
1052 let raw_segments: Vec<String> = path
1055 .0
1056 .iter()
1057 .filter_map(|step| match step {
1058 PathStep::Field(name) => Some(name.clone()),
1059 PathStep::Index(n) => Some(n.to_string()),
1060 PathStep::OfType(_) | PathStep::TypeFilter(_) => None,
1061 })
1062 .collect();
1063 if raw_segments.is_empty() {
1064 return root.to_string();
1065 }
1066
1067 let field_count = path
1079 .0
1080 .iter()
1081 .filter(|s| matches!(s, PathStep::Field(_)))
1082 .count();
1083 let trailing_zero_from_first =
1084 matches!(path.0.last(), Some(PathStep::Index(0))) && field_count >= 2;
1085 let other_indices = path
1086 .0
1087 .iter()
1088 .enumerate()
1089 .any(|(i, s)| matches!(s, PathStep::Index(_)) && i + 1 != path.0.len());
1090
1091 let segs: Vec<&str> = raw_segments.iter().map(String::as_str).collect();
1092
1093 if trailing_zero_from_first && !other_indices {
1098 let mut interleaved: Vec<String> = Vec::new();
1099 let mut first_field_seen = false;
1100 for step in &path.0[..path.0.len() - 1] {
1101 match step {
1102 PathStep::Field(n) => {
1103 interleaved.push(n.clone());
1104 if !first_field_seen {
1105 interleaved.push("0".to_string());
1106 first_field_seen = true;
1107 }
1108 }
1109 PathStep::Index(n) => interleaved.push(n.to_string()),
1110 _ => {}
1111 }
1112 }
1113 let lifted: Vec<&str> = interleaved.iter().map(String::as_str).collect();
1114 return dialect.json_path_text(root, &lifted);
1115 }
1116
1117 let already_indexed =
1118 other_indices || matches!(path.0.last(), Some(PathStep::Index(_))) && field_count < 2;
1119
1120 if field_count >= 2 && !already_indexed {
1124 let array_segs: Vec<String> = path
1125 .0
1126 .iter()
1127 .enumerate()
1128 .flat_map(|(i, step)| match step {
1129 PathStep::Field(name) if i == 0 => vec![name.clone(), "0".to_string()],
1130 PathStep::Field(name) => vec![name.clone()],
1131 PathStep::Index(n) => vec![n.to_string()],
1132 _ => Vec::new(),
1133 })
1134 .collect();
1135 let array_refs: Vec<&str> = array_segs.iter().map(String::as_str).collect();
1136 return format!(
1137 "coalesce({}, {})",
1138 dialect.json_path_text(root, &array_refs),
1139 dialect.json_path_text(root, &segs)
1140 );
1141 }
1142
1143 dialect.json_path_text(root, &segs)
1144}
1145
1146fn lower_binop(op: BinOp) -> &'static str {
1147 match op {
1148 BinOp::Eq => "=",
1149 BinOp::Neq => "!=",
1150 BinOp::Lt => "<",
1151 BinOp::Lte => "<=",
1152 BinOp::Gt => ">",
1153 BinOp::Gte => ">=",
1154 BinOp::Add => "+",
1155 BinOp::Sub => "-",
1156 BinOp::Mul => "*",
1157 BinOp::Div => "/",
1158 BinOp::And => "AND",
1159 BinOp::Or => "OR",
1160 BinOp::Concat => "||",
1161 BinOp::Like => "LIKE",
1162 BinOp::RegexMatch => "~",
1163 }
1164}
1165
1166fn lower_binop_dialect(
1183 op: BinOp,
1184 lhs: &SqlExpr,
1185 rhs: &SqlExpr,
1186 ctx: &mut ExprCtx<'_>,
1187) -> Result<String, SofError> {
1188 if ctx.dialect.name() != "postgres" {
1189 let l = lower_expr(lhs, ctx)?;
1190 let r = lower_expr(rhs, ctx)?;
1191 return Ok(format!("({l} {} {r})", lower_binop(op)));
1192 }
1193
1194 let op_sql = lower_binop(op);
1195
1196 match op {
1197 BinOp::Eq | BinOp::Neq => {
1198 if let Some(b) = bool_literal(rhs) {
1201 let l = lower_expr(lhs, ctx)?;
1202 let lit = if b { "'true'" } else { "'false'" };
1203 return Ok(format!("({l} {op_sql} {lit})"));
1204 }
1205 if let Some(b) = bool_literal(lhs) {
1206 let r = lower_expr(rhs, ctx)?;
1207 let lit = if b { "'true'" } else { "'false'" };
1208 return Ok(format!("({lit} {op_sql} {r})"));
1209 }
1210
1211 if is_numeric_literal(rhs) {
1213 let l = lower_expr(lhs, ctx)?;
1214 let r = lower_expr(rhs, ctx)?;
1215 return Ok(format!("({} {op_sql} {r})", cast_pg_numeric(lhs, &l)));
1216 }
1217 if is_numeric_literal(lhs) {
1218 let l = lower_expr(lhs, ctx)?;
1219 let r = lower_expr(rhs, ctx)?;
1220 return Ok(format!("({l} {op_sql} {})", cast_pg_numeric(rhs, &r)));
1221 }
1222
1223 let l = lower_expr(lhs, ctx)?;
1226 let r = lower_expr(rhs, ctx)?;
1227 Ok(format!("({l} {op_sql} {r})"))
1228 }
1229 BinOp::Lt | BinOp::Lte | BinOp::Gt | BinOp::Gte => {
1230 let l = lower_expr(lhs, ctx)?;
1231 let r = lower_expr(rhs, ctx)?;
1232 Ok(format!(
1233 "({} {op_sql} {})",
1234 cast_pg_numeric(lhs, &l),
1235 cast_pg_numeric(rhs, &r)
1236 ))
1237 }
1238 BinOp::Add | BinOp::Sub | BinOp::Mul | BinOp::Div => {
1239 let l = lower_expr(lhs, ctx)?;
1240 let r = lower_expr(rhs, ctx)?;
1241 Ok(format!(
1242 "({} {op_sql} {})",
1243 cast_pg_numeric(lhs, &l),
1244 cast_pg_numeric(rhs, &r)
1245 ))
1246 }
1247 BinOp::And | BinOp::Or => {
1248 let l = lower_expr(lhs, ctx)?;
1252 let r = lower_expr(rhs, ctx)?;
1253 Ok(format!("(({l})::boolean {op_sql} ({r})::boolean)"))
1254 }
1255 BinOp::Concat | BinOp::Like | BinOp::RegexMatch => {
1256 let l = lower_expr(lhs, ctx)?;
1257 let r = lower_expr(rhs, ctx)?;
1258 Ok(format!("({l} {op_sql} {r})"))
1259 }
1260 }
1261}
1262
1263fn bool_literal(e: &SqlExpr) -> Option<bool> {
1264 match e {
1265 SqlExpr::Lit(LitValue::Bool(b)) => Some(*b),
1266 _ => None,
1267 }
1268}
1269
1270fn is_numeric_literal(e: &SqlExpr) -> bool {
1271 matches!(
1272 e,
1273 SqlExpr::Lit(LitValue::Int(_)) | SqlExpr::Lit(LitValue::Decimal(_))
1274 )
1275}
1276
1277fn cast_pg_numeric(expr: &SqlExpr, lowered: &str) -> String {
1289 if is_numeric_literal(expr) {
1290 return lowered.to_string();
1291 }
1292 if matches!(expr, SqlExpr::Param(_) | SqlExpr::Lit(LitValue::Str(_))) {
1293 return format!("({lowered}::text)::numeric");
1294 }
1295 format!("({lowered})::numeric")
1296}
1297
1298fn emit_sqlite_unnest_source(source: &SqlExpr) -> String {
1310 if let SqlExpr::JsonPath { root, path } = source {
1311 let segments_owned: Vec<String> = path
1312 .0
1313 .iter()
1314 .filter_map(|s| match s {
1315 PathStep::Field(n) => Some(n.clone()),
1316 PathStep::Index(n) => Some(n.to_string()),
1317 _ => None,
1318 })
1319 .collect();
1320 let segments: Vec<&str> = segments_owned.iter().map(String::as_str).collect();
1321 let path_step_count = path
1322 .0
1323 .iter()
1324 .filter(|s| matches!(s, PathStep::Field(_) | PathStep::Index(_)))
1325 .count();
1326 if segments.len() == path_step_count && !segments.is_empty() {
1327 let mut path_str = String::from("$");
1330 for s in &segments {
1331 if s.chars().all(|c| c.is_ascii_digit()) {
1332 path_str.push('[');
1333 path_str.push_str(s);
1334 path_str.push(']');
1335 } else {
1336 path_str.push('.');
1337 path_str.push_str(s);
1338 }
1339 }
1340 let has_index = path.0.iter().any(|s| matches!(s, PathStep::Index(_)));
1347 if root == "r.data" && !has_index {
1348 return format!("json_each({root}, '{path_str}')");
1349 }
1350 let extracted = format!("json_extract({root}, '{path_str}')");
1351 let type_check = format!("json_type({root}, '{path_str}')");
1352 return format!(
1359 "json_each(CASE WHEN {type_check} = 'array' THEN {extracted} \
1360 WHEN {type_check} IN ('object', 'array') THEN json_array(json({extracted})) \
1361 WHEN {type_check} IS NOT NULL THEN json_array({extracted}) \
1362 ELSE '[]' END)"
1363 );
1364 }
1365 }
1366 let mut ctx = ExprCtx::new(&super::dialect::SqliteDialect, 3);
1367 let computed = lower_expr(source, &mut ctx).unwrap_or_else(|_| "NULL".to_string());
1368 format!("json_each(coalesce({computed}, '[]'))")
1369}
1370
1371fn emit_pg_unnest_source(source: &SqlExpr) -> String {
1382 let raw = if let SqlExpr::JsonPath { root, path } = source {
1383 let segments: Vec<String> = path
1384 .0
1385 .iter()
1386 .filter_map(|s| match s {
1387 PathStep::Field(n) => Some(n.clone()),
1388 PathStep::Index(n) => Some(n.to_string()),
1389 _ => None,
1390 })
1391 .collect();
1392 if segments.is_empty() {
1393 root.clone()
1394 } else if segments.len() == 1 {
1395 format!("{root}->'{}'", segments[0])
1396 } else {
1397 format!("{root}#>'{{{}}}'", segments.join(","))
1398 }
1399 } else {
1400 let mut ctx = ExprCtx::new(&super::dialect::PgDialect, 3);
1406 let inner = lower_expr(source, &mut ctx).unwrap_or_else(|_| "NULL".to_string());
1407 format!("({inner})::jsonb")
1408 };
1409 format!(
1410 "(CASE WHEN jsonb_typeof({raw}) = 'array' THEN {raw} \
1411 WHEN jsonb_typeof({raw}) IS NOT NULL THEN jsonb_build_array({raw}) \
1412 ELSE '[]'::jsonb END)"
1413 )
1414}
1415
1416fn lower_boundary(
1426 side: BoundarySide,
1427 kind: BoundaryKind,
1428 src: &str,
1429 dialect: &dyn Dialect,
1430) -> String {
1431 let is_sqlite = dialect.lateral_keyword().is_empty();
1432 let dot_pos = if is_sqlite {
1436 format!("instr({src}, '.')")
1437 } else {
1438 format!("position('.' in {src})")
1439 };
1440 let alpha_check = if is_sqlite {
1443 format!("({src}) || '' GLOB '*[A-Za-z]*'")
1444 } else {
1445 format!("({src})::text ~ '[A-Za-z]'")
1446 };
1447 match kind {
1448 BoundaryKind::Decimal => {
1449 let len_after_dot = format!(
1456 "(length({src}) - CASE WHEN {dot_pos} = 0 \
1457 THEN length({src}) \
1458 ELSE {dot_pos} END)"
1459 );
1460 let half_step = format!(
1467 "CASE {len_after_dot} \
1468 WHEN 0 THEN 0.5 \
1469 WHEN 1 THEN 0.05 \
1470 WHEN 2 THEN 0.005 \
1471 WHEN 3 THEN 0.0005 \
1472 WHEN 4 THEN 0.00005 \
1473 WHEN 5 THEN 0.000005 \
1474 WHEN 6 THEN 0.0000005 \
1475 ELSE 0.00000005 END"
1476 );
1477 let op = match side {
1478 BoundarySide::Low => "-",
1479 BoundarySide::High => "+",
1480 };
1481 let numeric_src = if is_sqlite {
1484 format!("({src})")
1485 } else {
1486 format!("({src})::numeric")
1487 };
1488 format!(
1491 "CASE WHEN {src} IS NULL THEN NULL \
1492 WHEN {alpha_check} THEN NULL \
1493 ELSE {numeric_src} {op} {half_step} END"
1494 )
1495 }
1496 BoundaryKind::Date => {
1497 let pad_month_only = match side {
1499 BoundarySide::Low => "'-01-01'",
1500 BoundarySide::High => "'-12-31'",
1501 };
1502 let day_pad = match side {
1503 BoundarySide::Low => "'-01'".to_string(),
1504 BoundarySide::High => format!(
1505 "'-' || CASE substr({src}, 6, 2) \
1506 WHEN '02' THEN '28' \
1507 WHEN '04' THEN '30' \
1508 WHEN '06' THEN '30' \
1509 WHEN '09' THEN '30' \
1510 WHEN '11' THEN '30' \
1511 ELSE '31' END"
1512 ),
1513 };
1514 format!(
1515 "CASE \
1516 WHEN {src} IS NULL THEN NULL \
1517 WHEN length({src}) = 10 THEN {src} \
1518 WHEN length({src}) = 7 THEN {src} || {day_pad} \
1519 WHEN length({src}) = 4 THEN {src} || {pad_month_only} \
1520 ELSE NULL END"
1521 )
1522 }
1523 BoundaryKind::DateTime => {
1524 let pad_full_day = match side {
1542 BoundarySide::Low => "'T00:00:00.000+14:00'",
1543 BoundarySide::High => "'T23:59:59.999-12:00'",
1544 };
1545 let pad_month_only = match side {
1546 BoundarySide::Low => "'-01-01'",
1547 BoundarySide::High => "'-12-31'",
1548 };
1549 let day_pad = match side {
1550 BoundarySide::Low => "'-01'".to_string(),
1551 BoundarySide::High => format!(
1552 "'-' || CASE substr({src}, 6, 2) \
1553 WHEN '02' THEN '28' \
1554 WHEN '04' THEN '30' \
1555 WHEN '06' THEN '30' \
1556 WHEN '09' THEN '30' \
1557 WHEN '11' THEN '30' \
1558 ELSE '31' END"
1559 ),
1560 };
1561 format!(
1562 "CASE \
1563 WHEN {src} IS NULL THEN NULL \
1564 WHEN length({src}) = 10 THEN {src} || {pad_full_day} \
1565 WHEN length({src}) = 7 THEN {src} || {day_pad} \
1566 WHEN length({src}) = 4 THEN {src} || {pad_month_only} \
1567 ELSE NULL END"
1568 )
1569 }
1570 BoundaryKind::Time => {
1571 let pad = match side {
1572 BoundarySide::Low => "':00.000'",
1573 BoundarySide::High => "':59.999'",
1574 };
1575 format!(
1576 "CASE \
1577 WHEN {src} IS NULL THEN NULL \
1578 WHEN length({src}) = 5 THEN {src} || {pad} \
1579 ELSE NULL END"
1580 )
1581 }
1582 }
1583}
1584
1585fn strip_trailing_order_by(sql: &str) -> &str {
1589 let upper = sql.to_ascii_uppercase();
1590 if let Some(pos) = upper.rfind("\nORDER BY") {
1591 &sql[..pos]
1592 } else if let Some(pos) = upper.rfind(" ORDER BY") {
1593 &sql[..pos]
1594 } else {
1595 sql
1596 }
1597}
1598
1599fn sanitize_ident(name: &str) -> Result<&str, SofError> {
1603 if name.contains('"') || name.contains('\0') {
1604 return Err(SofError::InvalidViewDefinition(format!(
1605 "column name '{name}' contains an unsupported character"
1606 )));
1607 }
1608 Ok(name)
1609}
1610
1611const _: Option<JsonType> = None;