Skip to main content

dbrest_core/query/
builder.rs

1//! Plan-to-SQL conversion functions.
2//!
3//! Converts typed plan trees (`ReadPlanTree`, `MutatePlan`, `CallPlan`) into
4//! parameterised SQL queries. This module sits between the plan layer and the
5//! statement assemblers: the plan describes *what* to query, and this module
6//! decides *how* to express it in SQL.
7//!
8//! # Pipeline
9//!
10//! ```text
11//! ReadPlanTree  ──▶ read_plan_to_query()       ──▶ SqlBuilder (SELECT …)
12//! ReadPlanTree  ──▶ read_plan_to_count_query()  ──▶ SqlBuilder (SELECT COUNT(*) …)
13//! MutatePlan    ──▶ mutate_plan_to_query()      ──▶ SqlBuilder (INSERT / UPDATE / DELETE …)
14//! CallPlan      ──▶ call_plan_to_query()         ──▶ SqlBuilder (SELECT func(…) …)
15//! ```
16
17use crate::api_request::types::Payload;
18use crate::backend::SqlDialect;
19use crate::plan::call_plan::{CallArgs, CallParams, CallPlan, RpcParamValue};
20use crate::plan::mutate_plan::{DeletePlan, InsertPlan, MutatePlan, UpdatePlan};
21use crate::plan::read_plan::ReadPlanTree;
22
23use super::fragment;
24use super::sql_builder::{SqlBuilder, SqlParam};
25
26// ==========================================================================
27// Read plan → SQL
28// ==========================================================================
29
30/// Convert a `ReadPlanTree` into a SELECT query.
31///
32/// Generates a recursive SELECT with lateral joins for embedded resources.
33/// Each child in the tree becomes a lateral subquery that is joined to the
34/// parent.
35///
36/// # Behaviour
37///
38/// - Root node produces the main `SELECT … FROM …`
39/// - Each child becomes a `LATERAL (SELECT …) AS alias` joined via ON conditions
40/// - Filters, order, limit/offset, and group-by are applied per node
41/// - JSON aggregation is used for to-many relations
42///
43/// # SQL Example
44///
45/// ```sql
46/// SELECT "public"."users"."id" AS "id",
47///        "public"."users"."name" AS "name",
48///        _dbrst_agg_1.body AS "posts"
49/// FROM "public"."users"
50/// LEFT JOIN LATERAL (
51///   SELECT coalesce(json_agg(_dbrst_t), '[]')::text AS body
52///   FROM (
53///     SELECT "public"."posts"."id" AS "id",
54///            "public"."posts"."title" AS "title"
55///     FROM "public"."posts"
56///     WHERE "public"."posts"."user_id" = "public"."users"."id"
57///   ) AS _dbrst_t
58/// ) AS _dbrst_agg_1 ON TRUE
59/// WHERE "public"."users"."id" = $1
60/// ORDER BY "public"."users"."name" ASC
61/// LIMIT 10
62/// ```
63pub fn read_plan_to_query(tree: &ReadPlanTree, dialect: &dyn SqlDialect) -> SqlBuilder {
64    let plan = &tree.node;
65    let qi = &plan.from;
66
67    let mut b = SqlBuilder::new();
68
69    // SELECT clause
70    b.push("SELECT ");
71    if plan.select.is_empty() {
72        // Default: select all columns
73        b.push_qi(qi);
74        b.push(".*");
75    } else {
76        b.push_separated(", ", &plan.select, |b, sel| {
77            fragment::fmt_select_item(b, qi, sel, dialect);
78        });
79    }
80
81    if dialect.supports_lateral_join() {
82        // Add join select expressions for children (references LATERAL JOIN aliases)
83        for child in &tree.forest {
84            b.push(", ");
85            let agg_alias = &child.node.rel_agg_alias;
86            b.push_ident(agg_alias);
87            b.push(".body");
88            let sel_name = child
89                .node
90                .rel_alias
91                .as_ref()
92                .unwrap_or(&child.node.rel_name);
93            b.push(" AS ");
94            b.push_ident(sel_name);
95        }
96    } else {
97        // Correlated subqueries for children (no LATERAL JOIN support)
98        for child in &tree.forest {
99            b.push(", ");
100            let is_to_one = child
101                .node
102                .rel_to_parent
103                .as_ref()
104                .map(|r| r.is_to_one())
105                .unwrap_or(false);
106
107            b.push("(SELECT ");
108            if is_to_one {
109                dialect.row_to_json(&mut b, "_dbrst_t");
110            } else {
111                dialect.json_agg(&mut b, "_dbrst_t");
112            }
113            b.push(" FROM (");
114            let child_query = read_plan_to_query(child, dialect);
115            b.push_builder(&child_query);
116            b.push(") AS ");
117            b.push_ident("_dbrst_t");
118            b.push(")");
119
120            let sel_name = child
121                .node
122                .rel_alias
123                .as_ref()
124                .unwrap_or(&child.node.rel_name);
125            b.push(" AS ");
126            b.push_ident(sel_name);
127        }
128    }
129
130    // FROM clause
131    b.push(" FROM ");
132    b.push_qi(qi);
133    if let Some(ref alias) = plan.from_alias {
134        b.push(" AS ");
135        b.push_ident(alias);
136    }
137
138    // LATERAL JOINs for children (only when supported)
139    if dialect.supports_lateral_join() {
140        for child in &tree.forest {
141            let is_inner = child
142                .node
143                .rel_join_type
144                .map(|jt| matches!(jt, crate::api_request::types::JoinType::Inner))
145                .unwrap_or(false);
146
147            let join_type = if is_inner {
148                "INNER JOIN LATERAL"
149            } else {
150                "LEFT JOIN LATERAL"
151            };
152
153            b.push(" ");
154            b.push(join_type);
155            b.push(" (");
156
157            // Inner aggregation subquery
158            let is_to_one = child
159                .node
160                .rel_to_parent
161                .as_ref()
162                .map(|r| r.is_to_one())
163                .unwrap_or(false);
164
165            if is_to_one {
166                b.push("SELECT ");
167                dialect.row_to_json(&mut b, "_dbrst_t");
168                b.push(" AS body FROM (");
169            } else {
170                b.push("SELECT ");
171                dialect.json_agg(&mut b, "_dbrst_t");
172                b.push(" AS body FROM (");
173            }
174
175            // Recursive child query
176            let child_query = read_plan_to_query(child, dialect);
177            b.push_builder(&child_query);
178
179            b.push(") AS ");
180            b.push_ident("_dbrst_t");
181            b.push(") AS ");
182            b.push_ident(&child.node.rel_agg_alias);
183            b.push(" ON TRUE");
184        }
185    }
186
187    // WHERE clause (includes join conditions for child nodes)
188    let mut has_where = false;
189    if !plan.where_.is_empty() {
190        fragment::where_clause(&mut b, qi, &plan.where_, dialect);
191        has_where = true;
192    }
193
194    // Join conditions to parent (for child nodes)
195    if !plan.rel_join_conds.is_empty() {
196        if has_where {
197            b.push(" AND ");
198        } else {
199            b.push(" WHERE ");
200        }
201        b.push_separated(" AND ", &plan.rel_join_conds, |b, jc| {
202            fragment::fmt_join_condition(b, jc);
203        });
204    }
205
206    // GROUP BY
207    fragment::group_clause(&mut b, qi, &plan.select);
208
209    // ORDER BY
210    fragment::order_clause(&mut b, qi, &plan.order);
211
212    // LIMIT / OFFSET
213    fragment::limit_offset(&mut b, plan.range.offset, plan.range.limit_to);
214
215    b
216}
217
218/// Convert a `ReadPlanTree` into a COUNT query.
219///
220/// Produces `SELECT COUNT(*) FROM (source_query) AS _dbrst_count_t`.
221///
222/// # SQL Example
223/// ```sql
224/// SELECT COUNT(*) AS "dbrst_filtered_count"
225/// FROM (SELECT … FROM "public"."users" WHERE …) AS _dbrst_count_t
226/// ```
227pub fn read_plan_to_count_query(tree: &ReadPlanTree, dialect: &dyn SqlDialect) -> SqlBuilder {
228    let mut b = SqlBuilder::new();
229    fragment::count_f(&mut b, dialect);
230
231    // Build the inner query without LIMIT/OFFSET for accurate counting
232    let plan = &tree.node;
233    let qi = &plan.from;
234
235    b.push(" FROM (SELECT 1 FROM ");
236    b.push_qi(qi);
237
238    // WHERE clause
239    if !plan.where_.is_empty() {
240        fragment::where_clause(&mut b, qi, &plan.where_, dialect);
241    }
242
243    b.push(") AS _dbrst_count_t");
244
245    b
246}
247
248// ==========================================================================
249// Mutate plan → SQL
250// ==========================================================================
251
252/// Convert a `MutatePlan` into an INSERT, UPDATE, or DELETE query.
253///
254/// # Behaviour
255///
256/// - **INSERT**: `INSERT INTO … SELECT … FROM json_to_recordset($1) … RETURNING …`
257/// - **UPDATE**: `UPDATE … SET (cols) = (SELECT … FROM json_to_recordset($1) …) WHERE … RETURNING …`
258/// - **DELETE**: `DELETE FROM … WHERE … RETURNING …`
259///
260/// # SQL Example
261/// ```sql
262/// -- Insert
263/// INSERT INTO "public"."users"("id", "name")
264/// SELECT "id", "name" FROM json_to_recordset($1) AS _("id" integer, "name" text)
265/// RETURNING "id" AS "id", "name" AS "name"
266///
267/// -- Update
268/// UPDATE "public"."users" SET ("name") =
269///   (SELECT "name" FROM json_to_recordset($1) AS _("name" text))
270/// WHERE "id" = $2
271/// RETURNING "id" AS "id", "name" AS "name"
272///
273/// -- Delete
274/// DELETE FROM "public"."users" WHERE "id" = $1
275/// RETURNING "id" AS "id"
276/// ```
277pub fn mutate_plan_to_query(plan: &MutatePlan, dialect: &dyn SqlDialect) -> SqlBuilder {
278    match plan {
279        MutatePlan::Insert(insert) => insert_to_query(insert, dialect),
280        MutatePlan::Update(update) => update_to_query(update, dialect),
281        MutatePlan::Delete(delete) => delete_to_query(delete, dialect),
282    }
283}
284
285/// Generate an INSERT query from an `InsertPlan`.
286///
287/// # Behaviour
288///
289/// - If `columns` is empty, emits `INSERT INTO … DEFAULT VALUES`
290/// - Otherwise, emits `INSERT INTO …(cols) SELECT cols FROM json_to_recordset($1) AS _(…)`
291/// - Appends ON CONFLICT clause if present (DO UPDATE SET or DO NOTHING)
292/// - Appends WHERE and RETURNING clauses from the plan
293///
294/// # SQL Example
295///
296/// ```sql
297/// INSERT INTO "public"."users"("id", "name")
298/// SELECT "id", "name" FROM json_to_recordset($1) AS _("id" integer, "name" text)
299/// ON CONFLICT("id") DO UPDATE SET "name" = EXCLUDED."name"
300/// RETURNING "id" AS "id", "name" AS "name"
301/// ```
302fn insert_to_query(plan: &InsertPlan, dialect: &dyn SqlDialect) -> SqlBuilder {
303    let qi = &plan.into;
304    let mut b = SqlBuilder::new();
305
306    b.push("INSERT INTO ");
307    b.push_qi(qi);
308
309    if plan.columns.is_empty() {
310        // Empty insert (DEFAULT VALUES)
311        b.push(" DEFAULT VALUES");
312    } else {
313        // Column list
314        b.push("(");
315        b.push_separated(", ", &plan.columns, |b, col| {
316            b.push_ident(&col.name);
317        });
318        b.push(")");
319
320        // SELECT from JSON body
321        b.push(" SELECT ");
322        b.push_separated(", ", &plan.columns, |b, col| {
323            b.push_ident(&col.name);
324        });
325        b.push(" FROM ");
326
327        let json_bytes = payload_to_bytes(&plan.body);
328        fragment::from_json_body(&mut b, &plan.columns, &json_bytes, dialect);
329    }
330
331    // ON CONFLICT
332    if let Some(ref oc) = plan.on_conflict {
333        b.push(" ON CONFLICT(");
334        b.push_separated(", ", &oc.columns, |b, col| {
335            b.push_ident(col);
336        });
337        b.push(")");
338
339        if oc.merge_duplicates {
340            b.push(" DO UPDATE SET ");
341            b.push_separated(", ", &plan.columns, |b, col| {
342                b.push_ident(&col.name);
343                b.push(" = EXCLUDED.");
344                b.push_ident(&col.name);
345            });
346        } else {
347            b.push(" DO NOTHING");
348        }
349    }
350
351    // WHERE
352    fragment::where_clause(&mut b, qi, &plan.where_, dialect);
353
354    // RETURNING
355    fragment::returning_clause(&mut b, qi, &plan.returning, dialect);
356
357    b
358}
359
360/// Generate an UPDATE query from an `UpdatePlan`.
361///
362/// # Behaviour
363///
364/// - Single column: `UPDATE … SET "col" = (SELECT "col" FROM json_to_recordset(…))`
365/// - Multiple columns: `UPDATE … SET (cols) = (SELECT cols FROM json_to_recordset(…))`
366/// - The JSON body is unpacked via `json_to_recordset` with typed column definitions
367/// - Appends WHERE and RETURNING clauses from the plan
368///
369/// # SQL Example
370///
371/// ```sql
372/// UPDATE "public"."users" SET "name" = (SELECT "name"
373///   FROM json_to_recordset($1) AS _("name" text))
374/// WHERE "public"."users"."id"=$2
375/// RETURNING "id" AS "id", "name" AS "name"
376/// ```
377fn update_to_query(plan: &UpdatePlan, dialect: &dyn SqlDialect) -> SqlBuilder {
378    let qi = &plan.into;
379    let mut b = SqlBuilder::new();
380
381    b.push("UPDATE ");
382    b.push_qi(qi);
383    b.push(" SET ");
384
385    if plan.columns.len() == 1 {
386        b.push_ident(&plan.columns[0].name);
387        b.push(" = (SELECT ");
388        b.push_ident(&plan.columns[0].name);
389    } else {
390        b.push("(");
391        b.push_separated(", ", &plan.columns, |b, col| {
392            b.push_ident(&col.name);
393        });
394        b.push(") = (SELECT ");
395        b.push_separated(", ", &plan.columns, |b, col| {
396            b.push_ident(&col.name);
397        });
398    }
399
400    b.push(" FROM ");
401    let json_bytes = payload_to_bytes(&plan.body);
402    fragment::from_json_body(&mut b, &plan.columns, &json_bytes, dialect);
403    b.push(")");
404
405    // WHERE
406    fragment::where_clause(&mut b, qi, &plan.where_, dialect);
407
408    // RETURNING
409    fragment::returning_clause(&mut b, qi, &plan.returning, dialect);
410
411    b
412}
413
414/// Generate a DELETE query from a `DeletePlan`.
415///
416/// # Behaviour
417///
418/// Emits `DELETE FROM "schema"."table"` with optional WHERE and RETURNING
419/// clauses. The simplest of the mutation queries.
420///
421/// # SQL Example
422///
423/// ```sql
424/// DELETE FROM "public"."users"
425/// WHERE "public"."users"."id"=$1
426/// RETURNING "id" AS "id"
427/// ```
428fn delete_to_query(plan: &DeletePlan, dialect: &dyn SqlDialect) -> SqlBuilder {
429    let qi = &plan.from;
430    let mut b = SqlBuilder::new();
431
432    b.push("DELETE FROM ");
433    b.push_qi(qi);
434
435    // WHERE
436    fragment::where_clause(&mut b, qi, &plan.where_, dialect);
437
438    // RETURNING
439    fragment::returning_clause(&mut b, qi, &plan.returning, dialect);
440
441    b
442}
443
444// ==========================================================================
445// Call plan → SQL
446// ==========================================================================
447
448/// Convert a `CallPlan` into a function call query.
449///
450/// # Behaviour
451///
452/// - Named parameters: `SELECT * FROM "schema"."func"("p1" := $1, "p2" := $2)`
453/// - JSON body: `SELECT * FROM "schema"."func"($1::jsonb)` (for single-param JSON functions)
454/// - No args: `SELECT * FROM "schema"."func"()`
455///
456/// # SQL Example
457/// ```sql
458/// SELECT * FROM "public"."add_numbers"("a" := $1, "b" := $2)
459/// SELECT * FROM "public"."get_data"($1::jsonb)
460/// ```
461pub fn call_plan_to_query(plan: &CallPlan, dialect: &dyn SqlDialect) -> SqlBuilder {
462    let _ = dialect; // Available for future use (named_param_assign, variadic syntax)
463    let mut b = SqlBuilder::new();
464
465    b.push("SELECT * FROM ");
466    b.push_qi(&plan.qi);
467    b.push("(");
468
469    match &plan.args {
470        CallArgs::DirectArgs(args) => {
471            match &plan.params {
472                CallParams::KeyParams(params) => {
473                    let mut first = true;
474                    for param in params {
475                        if let Some(val) = args.get(&param.name) {
476                            if !first {
477                                b.push(", ");
478                            }
479                            first = false;
480                            b.push_ident(&param.name);
481                            b.push(" := ");
482                            match val {
483                                RpcParamValue::Fixed(v) => {
484                                    b.push_param(SqlParam::Text(v.to_string()));
485                                }
486                                RpcParamValue::Variadic(vals) => {
487                                    b.push("VARIADIC ARRAY[");
488                                    for (i, v) in vals.iter().enumerate() {
489                                        if i > 0 {
490                                            b.push(", ");
491                                        }
492                                        b.push_param(SqlParam::Text(v.to_string()));
493                                    }
494                                    b.push("]");
495                                }
496                            }
497                        }
498                    }
499                }
500                CallParams::OnePosParam(_) => {
501                    // Single positional — take first value
502                    if let Some((_, val)) = args.iter().next() {
503                        match val {
504                            RpcParamValue::Fixed(v) => {
505                                b.push_param(SqlParam::Text(v.to_string()));
506                            }
507                            RpcParamValue::Variadic(vals) => {
508                                b.push_param(SqlParam::Text(vals.join(",").to_string()));
509                            }
510                        }
511                    }
512                }
513            }
514        }
515        CallArgs::JsonArgs(json) => {
516            if let Some(body) = json {
517                b.push_param(SqlParam::Json(body.clone()));
518            }
519        }
520    }
521
522    b.push(")");
523
524    b
525}
526
527// ==========================================================================
528// Helpers
529// ==========================================================================
530
531/// Extract raw bytes from a `Payload` for use in `json_to_recordset()`.
532///
533/// # Behaviour
534///
535/// - `ProcessedJSON` / `RawJSON` / `RawPayload` — returns the raw bytes directly
536/// - `ProcessedUrlEncoded` — converts key-value pairs to a JSON object string
537///   so they can be consumed by `json_to_recordset`
538fn payload_to_bytes(payload: &Payload) -> Vec<u8> {
539    match payload {
540        Payload::ProcessedJSON { raw, .. } => raw.to_vec(),
541        Payload::RawJSON(raw) => raw.to_vec(),
542        Payload::RawPayload(raw) => raw.to_vec(),
543        Payload::ProcessedUrlEncoded { params, .. } => {
544            // Convert URL-encoded params to JSON for json_to_recordset
545            let json = serde_json::json!(
546                params
547                    .iter()
548                    .map(|(k, v)| (k.as_str(), v.as_str()))
549                    .collect::<std::collections::HashMap<_, _>>()
550            );
551            json.to_string().into_bytes()
552        }
553    }
554}
555
556// ==========================================================================
557// Tests
558// ==========================================================================
559
560#[cfg(test)]
561mod tests {
562    use super::*;
563    use crate::api_request::range::Range;
564    use crate::api_request::types::*;
565    use crate::plan::call_plan::*;
566    use crate::plan::mutate_plan::*;
567    use crate::plan::read_plan::*;
568    use crate::plan::types::*;
569    use crate::test_helpers::TestPgDialect;
570    use crate::types::identifiers::QualifiedIdentifier;
571    use bytes::Bytes;
572    use compact_str::CompactString;
573    use smallvec::SmallVec;
574    use std::collections::HashMap;
575
576    fn dialect() -> &'static dyn SqlDialect {
577        &TestPgDialect
578    }
579
580    fn test_qi() -> QualifiedIdentifier {
581        QualifiedIdentifier::new("public", "users")
582    }
583
584    fn field(name: &str) -> CoercibleField {
585        CoercibleField::unknown(name.into(), SmallVec::new())
586    }
587
588    fn typed_field(name: &str, base_type: &str) -> CoercibleField {
589        CoercibleField::from_column(name.into(), SmallVec::new(), base_type.into())
590    }
591
592    fn select_field(name: &str) -> CoercibleSelectField {
593        CoercibleSelectField {
594            field: field(name),
595            agg_function: None,
596            agg_cast: None,
597            cast: None,
598            alias: None,
599        }
600    }
601
602    // ------------------------------------------------------------------
603    // Read plan tests
604    // ------------------------------------------------------------------
605
606    #[test]
607    fn test_read_plan_simple() {
608        let mut plan = ReadPlan::root(test_qi());
609        plan.select = vec![select_field("id"), select_field("name")];
610
611        let tree = ReadPlanTree::leaf(plan);
612        let b = read_plan_to_query(&tree, dialect());
613        let sql = b.sql();
614
615        assert!(sql.starts_with("SELECT "));
616        assert!(sql.contains("\"id\""));
617        assert!(sql.contains("\"name\""));
618        assert!(sql.contains("FROM \"public\".\"users\""));
619    }
620
621    #[test]
622    fn test_read_plan_with_where() {
623        let mut plan = ReadPlan::root(test_qi());
624        plan.select = vec![select_field("id")];
625        plan.where_ = vec![CoercibleLogicTree::Stmnt(CoercibleFilter::Filter {
626            field: field("id"),
627            op_expr: OpExpr::Expr {
628                negated: false,
629                operation: Operation::Quant(QuantOperator::Equal, None, "1".into()),
630            },
631        })];
632
633        let tree = ReadPlanTree::leaf(plan);
634        let b = read_plan_to_query(&tree, dialect());
635
636        assert!(b.sql().contains("WHERE"));
637        assert!(b.sql().contains("$1"));
638        assert_eq!(b.param_count(), 1);
639    }
640
641    #[test]
642    fn test_read_plan_with_order() {
643        let mut plan = ReadPlan::root(test_qi());
644        plan.select = vec![select_field("name")];
645        plan.order = vec![CoercibleOrderTerm::Term {
646            field: field("name"),
647            direction: Some(OrderDirection::Asc),
648            nulls: None,
649        }];
650
651        let tree = ReadPlanTree::leaf(plan);
652        let sql = read_plan_to_query(&tree, dialect()).sql().to_string();
653        assert!(sql.contains("ORDER BY"));
654        assert!(sql.contains("ASC"));
655    }
656
657    #[test]
658    fn test_read_plan_with_limit_offset() {
659        let mut plan = ReadPlan::root(test_qi());
660        plan.select = vec![select_field("id")];
661        plan.range = Range {
662            offset: 5,
663            limit_to: Some(14),
664        };
665
666        let tree = ReadPlanTree::leaf(plan);
667        let sql = read_plan_to_query(&tree, dialect()).sql().to_string();
668        assert!(sql.contains("LIMIT 10"));
669        assert!(sql.contains("OFFSET 5"));
670    }
671
672    #[test]
673    fn test_read_plan_default_select() {
674        let plan = ReadPlan::root(test_qi());
675        let tree = ReadPlanTree::leaf(plan);
676        let sql = read_plan_to_query(&tree, dialect()).sql().to_string();
677        assert!(sql.contains("\"public\".\"users\".*"));
678    }
679
680    #[test]
681    fn test_read_plan_count_query() {
682        let mut plan = ReadPlan::root(test_qi());
683        plan.where_ = vec![CoercibleLogicTree::Stmnt(CoercibleFilter::Filter {
684            field: field("status"),
685            op_expr: OpExpr::Expr {
686                negated: false,
687                operation: Operation::Quant(QuantOperator::Equal, None, "active".into()),
688            },
689        })];
690
691        let tree = ReadPlanTree::leaf(plan);
692        let b = read_plan_to_count_query(&tree, dialect());
693
694        assert!(b.sql().contains("COUNT(*)"));
695        assert!(b.sql().contains("_dbrst_count_t"));
696    }
697
698    #[test]
699    fn test_read_plan_with_lateral_join() {
700        use crate::schema_cache::relationship::{AnyRelationship, Cardinality, Relationship};
701
702        let root = ReadPlan::root(test_qi());
703        let mut child = ReadPlan::child(
704            QualifiedIdentifier::new("public", "posts"),
705            "posts".into(),
706            1,
707        );
708        child.select = vec![select_field("id"), select_field("title")];
709        child.rel_to_parent = Some(AnyRelationship::ForeignKey(Relationship {
710            table: QualifiedIdentifier::new("public", "users"),
711            foreign_table: QualifiedIdentifier::new("public", "posts"),
712            is_self: false,
713            cardinality: Cardinality::O2M {
714                constraint: "fk_posts".into(),
715                columns: smallvec::smallvec![("id".into(), "user_id".into())],
716            },
717            table_is_view: false,
718            foreign_table_is_view: false,
719        }));
720        child.rel_join_conds = vec![JoinCondition {
721            parent: (test_qi(), "id".into()),
722            child: (
723                QualifiedIdentifier::new("public", "posts"),
724                "user_id".into(),
725            ),
726        }];
727
728        let tree = ReadPlanTree::with_children(root, vec![ReadPlanTree::leaf(child)]);
729        let sql = read_plan_to_query(&tree, dialect()).sql().to_string();
730
731        assert!(sql.contains("LEFT JOIN LATERAL"));
732        assert!(sql.contains("json_agg"));
733        assert!(sql.contains("ON TRUE"));
734    }
735
736    // ------------------------------------------------------------------
737    // Mutate plan tests
738    // ------------------------------------------------------------------
739
740    #[test]
741    fn test_insert_query() {
742        let plan = MutatePlan::Insert(InsertPlan {
743            into: test_qi(),
744            columns: vec![typed_field("id", "integer"), typed_field("name", "text")],
745            body: Payload::RawJSON(Bytes::from(r#"[{"id":1,"name":"test"}]"#)),
746            on_conflict: None,
747            where_: vec![],
748            returning: vec![select_field("id")],
749            pk_cols: vec!["id".into()],
750            apply_defaults: false,
751        });
752
753        let b = mutate_plan_to_query(&plan, dialect());
754        let sql = b.sql();
755        assert!(sql.starts_with("INSERT INTO "));
756        assert!(sql.contains("json_to_recordset"));
757        assert!(sql.contains("RETURNING"));
758    }
759
760    #[test]
761    fn test_insert_with_on_conflict() {
762        let plan = MutatePlan::Insert(InsertPlan {
763            into: test_qi(),
764            columns: vec![typed_field("id", "integer"), typed_field("name", "text")],
765            body: Payload::RawJSON(Bytes::from(r#"[{"id":1,"name":"test"}]"#)),
766            on_conflict: Some(crate::plan::mutate_plan::OnConflict {
767                columns: vec!["id".into()],
768                merge_duplicates: true,
769            }),
770            where_: vec![],
771            returning: vec![],
772            pk_cols: vec!["id".into()],
773            apply_defaults: false,
774        });
775
776        let sql = mutate_plan_to_query(&plan, dialect()).sql().to_string();
777        assert!(sql.contains("ON CONFLICT"));
778        assert!(sql.contains("DO UPDATE SET"));
779        assert!(sql.contains("EXCLUDED"));
780    }
781
782    #[test]
783    fn test_insert_do_nothing() {
784        let plan = MutatePlan::Insert(InsertPlan {
785            into: test_qi(),
786            columns: vec![typed_field("id", "integer")],
787            body: Payload::RawJSON(Bytes::from(r#"[{"id":1}]"#)),
788            on_conflict: Some(crate::plan::mutate_plan::OnConflict {
789                columns: vec!["id".into()],
790                merge_duplicates: false,
791            }),
792            where_: vec![],
793            returning: vec![],
794            pk_cols: vec!["id".into()],
795            apply_defaults: false,
796        });
797
798        let sql = mutate_plan_to_query(&plan, dialect()).sql().to_string();
799        assert!(sql.contains("DO NOTHING"));
800    }
801
802    #[test]
803    fn test_update_query() {
804        let plan = MutatePlan::Update(UpdatePlan {
805            into: test_qi(),
806            columns: vec![typed_field("name", "text")],
807            body: Payload::RawJSON(Bytes::from(r#"{"name":"updated"}"#)),
808            where_: vec![CoercibleLogicTree::Stmnt(CoercibleFilter::Filter {
809                field: field("id"),
810                op_expr: OpExpr::Expr {
811                    negated: false,
812                    operation: Operation::Quant(QuantOperator::Equal, None, "1".into()),
813                },
814            })],
815            returning: vec![select_field("id"), select_field("name")],
816            apply_defaults: false,
817        });
818
819        let sql = mutate_plan_to_query(&plan, dialect()).sql().to_string();
820        assert!(sql.starts_with("UPDATE "));
821        assert!(sql.contains("SET "));
822        assert!(sql.contains("WHERE"));
823        assert!(sql.contains("RETURNING"));
824    }
825
826    #[test]
827    fn test_delete_query() {
828        let plan = MutatePlan::Delete(DeletePlan {
829            from: test_qi(),
830            where_: vec![CoercibleLogicTree::Stmnt(CoercibleFilter::Filter {
831                field: field("id"),
832                op_expr: OpExpr::Expr {
833                    negated: false,
834                    operation: Operation::Quant(QuantOperator::Equal, None, "1".into()),
835                },
836            })],
837            returning: vec![],
838        });
839
840        let sql = mutate_plan_to_query(&plan, dialect()).sql().to_string();
841        assert!(sql.starts_with("DELETE FROM "));
842        assert!(sql.contains("WHERE"));
843    }
844
845    // ------------------------------------------------------------------
846    // Call plan tests
847    // ------------------------------------------------------------------
848
849    #[test]
850    fn test_call_plan_named_args() {
851        let mut args = HashMap::new();
852        args.insert(CompactString::from("a"), RpcParamValue::Fixed("1".into()));
853        args.insert(CompactString::from("b"), RpcParamValue::Fixed("2".into()));
854
855        let plan = CallPlan {
856            qi: QualifiedIdentifier::new("public", "add_numbers"),
857            params: CallParams::KeyParams(vec![
858                crate::schema_cache::routine::RoutineParam {
859                    name: "a".into(),
860                    pg_type: "integer".into(),
861                    type_max_length: "integer".into(),
862                    required: true,
863                    is_variadic: false,
864                },
865                crate::schema_cache::routine::RoutineParam {
866                    name: "b".into(),
867                    pg_type: "integer".into(),
868                    type_max_length: "integer".into(),
869                    required: true,
870                    is_variadic: false,
871                },
872            ]),
873            args: CallArgs::DirectArgs(args),
874            scalar: true,
875            set_of_scalar: false,
876            filter_fields: vec![],
877            returning: vec![],
878        };
879
880        let sql = call_plan_to_query(&plan, dialect()).sql().to_string();
881        assert!(sql.starts_with("SELECT * FROM \"public\".\"add_numbers\"("));
882        assert!(sql.contains(":="));
883    }
884
885    #[test]
886    fn test_call_plan_json_body() {
887        let plan = CallPlan {
888            qi: QualifiedIdentifier::new("public", "process_data"),
889            params: CallParams::KeyParams(vec![]),
890            args: CallArgs::JsonArgs(Some(Bytes::from(r#"{"key":"value"}"#))),
891            scalar: false,
892            set_of_scalar: false,
893            filter_fields: vec![],
894            returning: vec![],
895        };
896
897        let b = call_plan_to_query(&plan, dialect());
898        assert!(b.sql().contains("$1"));
899        assert_eq!(b.param_count(), 1);
900    }
901
902    #[test]
903    fn test_call_plan_no_args() {
904        let plan = CallPlan {
905            qi: QualifiedIdentifier::new("public", "get_time"),
906            params: CallParams::KeyParams(vec![]),
907            args: CallArgs::JsonArgs(None),
908            scalar: true,
909            set_of_scalar: false,
910            filter_fields: vec![],
911            returning: vec![],
912        };
913
914        let sql = call_plan_to_query(&plan, dialect()).sql().to_string();
915        assert_eq!(sql, "SELECT * FROM \"public\".\"get_time\"()");
916    }
917
918    #[test]
919    fn test_call_plan_variadic() {
920        let mut args = HashMap::new();
921        args.insert(
922            CompactString::from("vals"),
923            RpcParamValue::Variadic(vec!["a".into(), "b".into(), "c".into()]),
924        );
925
926        let plan = CallPlan {
927            qi: QualifiedIdentifier::new("public", "concat_vals"),
928            params: CallParams::KeyParams(vec![crate::schema_cache::routine::RoutineParam {
929                name: "vals".into(),
930                pg_type: "text".into(),
931                type_max_length: "text".into(),
932                required: true,
933                is_variadic: true,
934            }]),
935            args: CallArgs::DirectArgs(args),
936            scalar: true,
937            set_of_scalar: false,
938            filter_fields: vec![],
939            returning: vec![],
940        };
941
942        let sql = call_plan_to_query(&plan, dialect()).sql().to_string();
943        assert!(sql.contains("VARIADIC ARRAY["));
944    }
945
946    // ------------------------------------------------------------------
947    // Payload helpers
948    // ------------------------------------------------------------------
949
950    #[test]
951    fn test_payload_to_bytes_raw_json() {
952        let payload = Payload::RawJSON(Bytes::from(r#"[{"id":1}]"#));
953        let bytes = payload_to_bytes(&payload);
954        assert_eq!(bytes, b"[{\"id\":1}]");
955    }
956
957    #[test]
958    fn test_insert_default_values() {
959        let plan = MutatePlan::Insert(InsertPlan {
960            into: test_qi(),
961            columns: vec![],
962            body: Payload::RawJSON(Bytes::from("{}")),
963            on_conflict: None,
964            where_: vec![],
965            returning: vec![],
966            pk_cols: vec![],
967            apply_defaults: true,
968        });
969
970        let sql = mutate_plan_to_query(&plan, dialect()).sql().to_string();
971        assert!(sql.contains("DEFAULT VALUES"));
972    }
973}