Skip to main content

dbrest_core/plan/
mod.rs

1//! Plan module for dbrest
2//!
3//! Transforms `ApiRequest` + `SchemaCache` into typed execution plans.
4//! This is the core request planner that sits between API request parsing
5//! and SQL generation.
6//!
7//! # Architecture
8//!
9//! ```text
10//! ApiRequest ──┐
11//!              ├──▶ action_plan() ──▶ ActionPlan
12//! SchemaCache ─┘
13//!                                      ├─ WrappedReadPlan  (GET / HEAD)
14//!                                      ├─ MutateReadPlan   (POST / PATCH / PUT / DELETE)
15//!                                      └─ CallReadPlan     (RPC)
16//! ```
17
18pub mod call_plan;
19pub mod mutate_plan;
20pub mod negotiate;
21pub mod read_plan;
22pub mod types;
23
24// Re-export key types
25pub use call_plan::{CallArgs, CallParams, CallPlan, RpcParamValue};
26pub use mutate_plan::{DeletePlan, InsertPlan, MutatePlan, OnConflict, UpdatePlan};
27pub use negotiate::negotiate_content;
28pub use read_plan::{JoinCondition, ReadPlan, ReadPlanTree};
29pub use types::*;
30
31use compact_str::CompactString;
32
33use crate::api_request::types::{Action, DbAction, InvokeMethod, Mutation, OrderTerm, SelectItem};
34use crate::api_request::{ApiRequest, Preferences, QueryParams};
35use crate::config::AppConfig;
36use crate::error::Error;
37use crate::schema_cache::SchemaCache;
38use crate::schema_cache::media_handler::ResolvedHandler;
39use crate::schema_cache::relationship::AnyRelationship;
40use crate::schema_cache::routine::Routine;
41use crate::schema_cache::table::Table;
42use crate::types::identifiers::{QualifiedIdentifier, RelIdentifier};
43use crate::types::media::MediaType;
44
45use crate::api_request::preferences::{PreferRepresentation, PreferTransaction};
46use crate::config::types::IsolationLevel;
47
48// ==========================================================================
49// ActionPlan -- top-level plan type
50// ==========================================================================
51
52/// Top-level action plan
53#[derive(Debug)]
54#[allow(clippy::large_enum_variant)]
55pub enum ActionPlan {
56    /// A database action plan
57    Db(DbActionPlan),
58    /// A non-database info plan
59    NoDb(InfoPlan),
60}
61
62/// A database action plan
63#[derive(Debug)]
64#[allow(clippy::large_enum_variant)]
65pub enum DbActionPlan {
66    /// A CRUD operation (read, mutate, or call)
67    DbCrud {
68        /// Whether to include EXPLAIN output
69        is_explain: bool,
70        /// The CRUD plan
71        plan: CrudPlan,
72    },
73    /// An inspect operation (schema introspection via GET /)
74    MayUseDb(InspectPlan),
75}
76
77/// A CRUD plan
78#[derive(Debug)]
79#[allow(clippy::large_enum_variant)]
80pub enum CrudPlan {
81    /// Wrapped read plan (GET / HEAD)
82    WrappedReadPlan {
83        read_plan: ReadPlanTree,
84        tx_mode: TxMode,
85        handler: ResolvedHandler,
86        media: MediaType,
87        headers_only: bool,
88        qi: QualifiedIdentifier,
89    },
90    /// Mutate + read plan (POST / PATCH / PUT / DELETE)
91    MutateReadPlan {
92        read_plan: ReadPlanTree,
93        mutate_plan: MutatePlan,
94        tx_mode: TxMode,
95        handler: ResolvedHandler,
96        media: MediaType,
97        mutation: Mutation,
98        qi: QualifiedIdentifier,
99    },
100    /// Call + read plan (RPC)
101    CallReadPlan {
102        read_plan: ReadPlanTree,
103        call_plan: CallPlan,
104        tx_mode: TxMode,
105        proc: Routine,
106        handler: ResolvedHandler,
107        media: MediaType,
108        inv_method: InvokeMethod,
109        qi: QualifiedIdentifier,
110    },
111}
112
113/// Inspect plan (for GET / on the root)
114#[derive(Debug)]
115pub struct InspectPlan {
116    pub media: MediaType,
117    pub tx_mode: TxMode,
118    pub headers_only: bool,
119    pub schema: CompactString,
120}
121
122/// Non-database info plan
123#[derive(Debug)]
124#[allow(clippy::large_enum_variant)]
125pub enum InfoPlan {
126    /// OPTIONS on a relation
127    RelInfoPlan(QualifiedIdentifier),
128    /// OPTIONS on a routine
129    RoutineInfoPlan(Routine),
130    /// OPTIONS on root schema
131    SchemaInfoPlan,
132}
133
134/// Transaction mode settings
135#[derive(Debug, Clone)]
136pub struct TxMode {
137    /// Isolation level for the transaction
138    pub isolation_level: IsolationLevel,
139    /// Whether to rollback the transaction
140    pub rollback: bool,
141}
142
143impl TxMode {
144    /// Default transaction mode
145    pub fn default_mode() -> Self {
146        Self {
147            isolation_level: IsolationLevel::ReadCommitted,
148            rollback: false,
149        }
150    }
151
152    /// Read-only transaction mode
153    pub fn read_only() -> Self {
154        Self {
155            isolation_level: IsolationLevel::ReadCommitted,
156            rollback: false,
157        }
158    }
159}
160
161// ==========================================================================
162// action_plan -- main entry point
163// ==========================================================================
164
165/// Build an action plan from an API request
166///
167/// This is the main entry point for planning. It resolves the action type,
168/// finds the relevant table/routine, builds the appropriate plan, and
169/// negotiates content types.
170pub fn action_plan(
171    config: &AppConfig,
172    api_request: &ApiRequest,
173    schema_cache: &SchemaCache,
174) -> Result<ActionPlan, Error> {
175    let action = &api_request.action;
176
177    match action {
178        // Info actions (OPTIONS) -> no DB needed
179        Action::RelationInfo(qi) => Ok(ActionPlan::NoDb(InfoPlan::RelInfoPlan(qi.clone()))),
180        Action::RoutineInfo(qi, _) => {
181            let proc = find_proc(schema_cache, qi)?;
182            Ok(ActionPlan::NoDb(InfoPlan::RoutineInfoPlan(proc.clone())))
183        }
184        Action::SchemaInfo => Ok(ActionPlan::NoDb(InfoPlan::SchemaInfoPlan)),
185
186        // DB actions
187        Action::Db(db_action) => match db_action {
188            // Schema read (GET /)
189            DbAction::SchemaRead {
190                schema,
191                headers_only,
192            } => {
193                let tx_mode = resolve_tx_mode(config, &api_request.preferences, true);
194                let media = api_request
195                    .accept_media_types
196                    .first()
197                    .cloned()
198                    .unwrap_or(MediaType::ApplicationOpenApi);
199
200                Ok(ActionPlan::Db(DbActionPlan::MayUseDb(InspectPlan {
201                    media,
202                    tx_mode,
203                    headers_only: *headers_only,
204                    schema: schema.clone(),
205                })))
206            }
207
208            // Relation read (GET / HEAD on a table)
209            DbAction::RelationRead { qi, headers_only } => {
210                let table = find_table(schema_cache, qi)?;
211                let rel_id = RelIdentifier::Table(qi.clone());
212
213                let handler = negotiate_content(
214                    &api_request.accept_media_types,
215                    &schema_cache.media_handlers,
216                    &rel_id,
217                    action,
218                    config.db_plan_enabled,
219                )?;
220
221                let read_tree = build_read_plan(config, schema_cache, api_request, qi, table)?;
222
223                let tx_mode = resolve_tx_mode(config, &api_request.preferences, true);
224
225                Ok(ActionPlan::Db(DbActionPlan::DbCrud {
226                    is_explain: false,
227                    plan: CrudPlan::WrappedReadPlan {
228                        read_plan: read_tree,
229                        tx_mode,
230                        handler: handler.clone(),
231                        media: handler.1,
232                        headers_only: *headers_only,
233                        qi: qi.clone(),
234                    },
235                }))
236            }
237
238            // Relation mutation (POST / PATCH / PUT / DELETE)
239            DbAction::RelationMut { qi, mutation } => {
240                let table = find_table(schema_cache, qi)?;
241                let rel_id = RelIdentifier::Table(qi.clone());
242
243                let handler = negotiate_content(
244                    &api_request.accept_media_types,
245                    &schema_cache.media_handlers,
246                    &rel_id,
247                    action,
248                    config.db_plan_enabled,
249                )?;
250
251                let read_tree = build_read_plan(config, schema_cache, api_request, qi, table)?;
252
253                let mutate = build_mutate_plan(qi, table, *mutation, api_request)?;
254
255                let tx_mode = resolve_tx_mode(config, &api_request.preferences, false);
256
257                Ok(ActionPlan::Db(DbActionPlan::DbCrud {
258                    is_explain: false,
259                    plan: CrudPlan::MutateReadPlan {
260                        read_plan: read_tree,
261                        mutate_plan: mutate,
262                        tx_mode,
263                        handler: handler.clone(),
264                        media: handler.1,
265                        mutation: *mutation,
266                        qi: qi.clone(),
267                    },
268                }))
269            }
270
271            // Routine invocation (RPC)
272            DbAction::Routine { qi, inv_method } => {
273                let proc = find_proc(schema_cache, qi)?;
274                let rel_id = proc
275                    .table_qi()
276                    .map(|tqi| RelIdentifier::Table(tqi.clone()))
277                    .unwrap_or(RelIdentifier::AnyElement);
278
279                let handler = negotiate_content(
280                    &api_request.accept_media_types,
281                    &schema_cache.media_handlers,
282                    &rel_id,
283                    action,
284                    config.db_plan_enabled,
285                )?;
286
287                let read_tree = build_call_read_plan(config, schema_cache, api_request, qi, proc)?;
288
289                let call = build_call_plan(proc, api_request)?;
290
291                let is_read = matches!(inv_method, InvokeMethod::InvRead(_));
292                let tx_mode = resolve_tx_mode(config, &api_request.preferences, is_read);
293
294                Ok(ActionPlan::Db(DbActionPlan::DbCrud {
295                    is_explain: false,
296                    plan: CrudPlan::CallReadPlan {
297                        read_plan: read_tree,
298                        call_plan: call,
299                        tx_mode,
300                        proc: proc.clone(),
301                        handler: handler.clone(),
302                        media: handler.1,
303                        inv_method: inv_method.clone(),
304                        qi: qi.clone(),
305                    },
306                }))
307            }
308        },
309    }
310}
311
312// ==========================================================================
313// Schema cache lookups
314// ==========================================================================
315
316/// Find a table in the schema cache
317pub fn find_table<'a>(
318    schema_cache: &'a SchemaCache,
319    qi: &QualifiedIdentifier,
320) -> Result<&'a Table, Error> {
321    schema_cache
322        .get_table(qi)
323        .ok_or_else(|| Error::TableNotFound {
324            name: qi.to_string(),
325            suggestion: None,
326        })
327}
328
329/// Find a routine (function) in the schema cache
330pub fn find_proc<'a>(
331    schema_cache: &'a SchemaCache,
332    qi: &QualifiedIdentifier,
333) -> Result<&'a Routine, Error> {
334    schema_cache
335        .get_routines(qi)
336        .and_then(|routines| routines.first())
337        .ok_or_else(|| Error::FunctionNotFound {
338            name: qi.to_string(),
339        })
340}
341
342/// Find relationships between two tables
343pub fn find_rels<'a>(
344    schema_cache: &'a SchemaCache,
345    source: &QualifiedIdentifier,
346    target_name: &str,
347) -> Vec<&'a AnyRelationship> {
348    schema_cache.find_relationships_to(source, target_name)
349}
350
351// ==========================================================================
352// Read plan builder
353// ==========================================================================
354
355/// Build a ReadPlanTree from an API request for a relation
356fn build_read_plan(
357    config: &AppConfig,
358    schema_cache: &SchemaCache,
359    api_request: &ApiRequest,
360    qi: &QualifiedIdentifier,
361    table: &Table,
362) -> Result<ReadPlanTree, Error> {
363    let qp = &api_request.query_params;
364
365    // Initialize root read plan
366    let mut root = ReadPlan::root(qi.clone());
367
368    // Add select fields
369    root.select = resolve_select(&qp.select, Some(table))?;
370
371    // Expand wildcard selects to individual columns.
372    // This is needed for backends like SQLite that cannot aggregate row aliases
373    // (e.g. `json_agg(alias)`) and need explicit column names.
374    expand_star_select(&mut root.select, table);
375
376    // Add filters (resolve column types for proper casting)
377    root.where_ = resolve_filters(&qp.filters_root, Some(table))?;
378
379    // Add orders
380    root.order = resolve_orders(&qp.order, Some(table));
381
382    // Add range
383    root.range = api_request.top_level_range;
384
385    // Build children from embedded relations in select
386    let children = build_children(config, schema_cache, qi, &qp.select, qp, 1)?;
387
388    // Restrict range based on config max rows
389    let mut tree = ReadPlanTree::with_children(root, children);
390    if let Some(max_rows) = config.db_max_rows {
391        restrict_range(&mut tree, max_rows);
392    }
393
394    Ok(tree)
395}
396
397/// Build a ReadPlanTree for an RPC call
398fn build_call_read_plan(
399    config: &AppConfig,
400    schema_cache: &SchemaCache,
401    api_request: &ApiRequest,
402    qi: &QualifiedIdentifier,
403    proc: &Routine,
404) -> Result<ReadPlanTree, Error> {
405    let qp = &api_request.query_params;
406
407    // For RPC, the "table" is the function's return type
408    let from_qi = proc.table_qi().cloned().unwrap_or_else(|| qi.clone());
409
410    let mut root = ReadPlan::root(from_qi.clone());
411    // RPC return type — look up table in cache for type resolution
412    let rpc_table = schema_cache.get_table(&from_qi);
413    root.select = resolve_select(&qp.select, rpc_table)?;
414    root.where_ = resolve_filters(&qp.filters_root, rpc_table)?;
415    root.order = resolve_orders(&qp.order, rpc_table);
416    root.range = api_request.top_level_range;
417
418    let children = build_children(config, schema_cache, &from_qi, &qp.select, qp, 1)?;
419
420    let mut tree = ReadPlanTree::with_children(root, children);
421    if let Some(max_rows) = config.db_max_rows {
422        restrict_range(&mut tree, max_rows);
423    }
424
425    Ok(tree)
426}
427
428/// Build child read plans from embedded relations in the select.
429///
430/// Each child receives a unique `rel_agg_alias` based on depth and
431/// sibling index to avoid alias collisions in the generated SQL.
432fn build_children(
433    config: &AppConfig,
434    schema_cache: &SchemaCache,
435    parent_qi: &QualifiedIdentifier,
436    select_items: &[SelectItem],
437    qp: &QueryParams,
438    depth: usize,
439) -> Result<Vec<ReadPlanTree>, Error> {
440    let mut children = Vec::new();
441    let mut sibling_idx: usize = 0;
442
443    for item in select_items {
444        match item {
445            SelectItem::Relation {
446                relation,
447                alias,
448                hint,
449                join_type,
450                children: sub_select,
451            } => {
452                let mut child_tree = build_child_plan(
453                    config,
454                    schema_cache,
455                    parent_qi,
456                    relation,
457                    alias.as_ref(),
458                    hint.as_ref(),
459                    *join_type,
460                    sub_select,
461                    qp,
462                    depth,
463                    false, // not spread
464                )?;
465                // Unique alias: depth + sibling index
466                child_tree.node.rel_agg_alias =
467                    CompactString::from(format!("dbrst_agg_{}_{}", depth, sibling_idx));
468                sibling_idx += 1;
469                children.push(child_tree);
470            }
471            SelectItem::Spread {
472                relation,
473                hint,
474                join_type,
475                children: sub_select,
476            } => {
477                let mut child_tree = build_child_plan(
478                    config,
479                    schema_cache,
480                    parent_qi,
481                    relation,
482                    None,
483                    hint.as_ref(),
484                    *join_type,
485                    sub_select,
486                    qp,
487                    depth,
488                    true, // spread
489                )?;
490                child_tree.node.rel_agg_alias =
491                    CompactString::from(format!("dbrst_agg_{}_{}", depth, sibling_idx));
492                sibling_idx += 1;
493                children.push(child_tree);
494            }
495            SelectItem::Field { .. } => {
496                // Fields are handled at the select resolution level, not as children
497            }
498        }
499    }
500
501    Ok(children)
502}
503
504/// Build a single child read plan for an embedded relation
505#[allow(clippy::too_many_arguments)]
506fn build_child_plan(
507    config: &AppConfig,
508    schema_cache: &SchemaCache,
509    parent_qi: &QualifiedIdentifier,
510    relation_name: &str,
511    alias: Option<&CompactString>,
512    hint: Option<&CompactString>,
513    join_type: Option<crate::api_request::types::JoinType>,
514    sub_select: &[SelectItem],
515    qp: &QueryParams,
516    depth: usize,
517    is_spread: bool,
518) -> Result<ReadPlanTree, Error> {
519    // Find the relationship
520    let rels = find_rels(schema_cache, parent_qi, relation_name);
521
522    let rel = if rels.is_empty() {
523        // Try as a table in the same schema
524        let child_qi = QualifiedIdentifier::new(parent_qi.schema.clone(), relation_name);
525        if schema_cache.get_table(&child_qi).is_some() {
526            // No FK relationship, but table exists — use it as a subquery
527            None
528        } else {
529            return Err(Error::RelationshipNotFound {
530                from_table: parent_qi.to_string(),
531                to_table: relation_name.to_string(),
532            });
533        }
534    } else if rels.len() == 1 {
535        Some(rels[0])
536    } else {
537        // Multiple relationships found — try to disambiguate with hint
538        if let Some(hint) = hint {
539            rels.iter()
540                .find(|r| {
541                    if let Some(fk) = r.as_fk() {
542                        fk.constraint_name() == hint.as_str()
543                    } else {
544                        false
545                    }
546                })
547                .copied()
548        } else {
549            return Err(Error::AmbiguousEmbedding(relation_name.to_string()));
550        }
551    };
552
553    // Build the child QI
554    let child_qi = if let Some(rel) = rel {
555        rel.foreign_table().clone()
556    } else {
557        QualifiedIdentifier::new(parent_qi.schema.clone(), relation_name)
558    };
559
560    let mut child_plan = ReadPlan::child(child_qi.clone(), relation_name.into(), depth);
561
562    // Set relationship metadata
563    if let Some(rel) = rel {
564        child_plan.rel_to_parent = Some(rel.clone());
565
566        // Build join conditions
567        if let Some(fk) = rel.as_fk() {
568            for (src, tgt) in fk.columns() {
569                child_plan.rel_join_conds.push(JoinCondition {
570                    parent: (parent_qi.clone(), src.clone()),
571                    child: (child_qi.clone(), tgt.clone()),
572                });
573            }
574        }
575    }
576
577    child_plan.rel_alias = alias.cloned();
578    child_plan.rel_hint = hint.cloned();
579    child_plan.rel_join_type = join_type;
580
581    if is_spread {
582        child_plan.rel_spread = Some(SpreadType::ToOneSpread);
583    }
584
585    // Get filters for this embed path
586    let embed_path = vec![CompactString::from(relation_name)];
587    let child_filters: Vec<_> = qp
588        .filters_not_root
589        .iter()
590        .filter(|(path, _)| *path == embed_path)
591        .map(|(_, f)| f.clone())
592        .collect();
593    let child_table = schema_cache.get_table(&child_qi);
594
595    // Resolve select, filters, and orders for the child
596    child_plan.select = resolve_select(sub_select, child_table)?;
597    child_plan.where_ = resolve_filters(&child_filters, child_table)?;
598
599    // Get orders for this embed path
600    let child_orders: Vec<_> = qp
601        .order
602        .iter()
603        .filter(|(path, _)| *path == embed_path)
604        .flat_map(|(_, orders)| orders.clone())
605        .collect();
606    child_plan.order = resolve_order_terms(&child_orders, child_table);
607
608    // Recursively build grandchildren
609    let grandchildren = build_children(config, schema_cache, &child_qi, sub_select, qp, depth + 1)?;
610
611    Ok(ReadPlanTree::with_children(child_plan, grandchildren))
612}
613
614// ==========================================================================
615// Mutate plan builder
616// ==========================================================================
617
618/// Build a MutatePlan from an API request
619fn build_mutate_plan(
620    qi: &QualifiedIdentifier,
621    table: &Table,
622    mutation: Mutation,
623    api_request: &ApiRequest,
624) -> Result<MutatePlan, Error> {
625    let qp = &api_request.query_params;
626
627    match mutation {
628        Mutation::MutationCreate | Mutation::MutationSingleUpsert => {
629            let payload = api_request.payload.clone().ok_or(Error::MissingPayload)?;
630
631            let columns = resolve_mutation_columns(table, &api_request.columns);
632
633            let on_conflict = if mutation == Mutation::MutationSingleUpsert {
634                Some(OnConflict {
635                    columns: table.pk_cols.iter().cloned().collect(),
636                    merge_duplicates: true,
637                })
638            } else {
639                qp.on_conflict.as_ref().map(|cols| OnConflict {
640                    columns: cols.clone(),
641                    merge_duplicates: api_request
642                        .preferences
643                        .resolution
644                        .map(|r| {
645                            matches!(
646                                r,
647                                crate::api_request::preferences::PreferResolution::MergeDuplicates
648                            )
649                        })
650                        .unwrap_or(false),
651                })
652            };
653
654            let apply_defaults = api_request
655                .preferences
656                .missing
657                .map(|m| {
658                    matches!(
659                        m,
660                        crate::api_request::preferences::PreferMissing::ApplyDefaults
661                    )
662                })
663                .unwrap_or(false);
664
665            Ok(MutatePlan::Insert(InsertPlan {
666                into: qi.clone(),
667                columns,
668                body: payload,
669                on_conflict,
670                where_: resolve_filters(&qp.filters_root, Some(table))?,
671                returning: resolve_returning(table, &api_request.preferences),
672                pk_cols: table.pk_cols.iter().cloned().collect(),
673                apply_defaults,
674            }))
675        }
676        Mutation::MutationUpdate => {
677            let payload = api_request.payload.clone().ok_or(Error::MissingPayload)?;
678
679            let columns = resolve_mutation_columns(table, &api_request.columns);
680
681            let apply_defaults = api_request
682                .preferences
683                .missing
684                .map(|m| {
685                    matches!(
686                        m,
687                        crate::api_request::preferences::PreferMissing::ApplyDefaults
688                    )
689                })
690                .unwrap_or(false);
691
692            Ok(MutatePlan::Update(UpdatePlan {
693                into: qi.clone(),
694                columns,
695                body: payload,
696                where_: resolve_filters(&qp.filters_root, Some(table))?,
697                returning: resolve_returning(table, &api_request.preferences),
698                apply_defaults,
699            }))
700        }
701        Mutation::MutationDelete => Ok(MutatePlan::Delete(DeletePlan {
702            from: qi.clone(),
703            where_: resolve_filters(&qp.filters_root, Some(table))?,
704            returning: resolve_returning(table, &api_request.preferences),
705        })),
706    }
707}
708
709// ==========================================================================
710// Call plan builder
711// ==========================================================================
712
713/// Build a CallPlan from an API request
714fn build_call_plan(proc: &Routine, api_request: &ApiRequest) -> Result<CallPlan, Error> {
715    let qp = &api_request.query_params;
716
717    // Determine call params
718    let params =
719        if proc.param_count() == 1 && proc.params[0].is_json_type() && !qp.params.is_empty() {
720            // Single JSON param — can use positional
721            CallParams::OnePosParam(proc.params[0].clone())
722        } else {
723            CallParams::KeyParams(proc.params.to_vec())
724        };
725
726    // Build call args
727    let args = if !qp.params.is_empty() {
728        // From query parameters
729        let rpc_params = call_plan::to_rpc_params(proc, &qp.params);
730        CallArgs::DirectArgs(rpc_params)
731    } else if let Some(ref payload) = api_request.payload {
732        // From body
733        match payload {
734            crate::api_request::types::Payload::ProcessedJSON { raw, .. }
735            | crate::api_request::types::Payload::RawJSON(raw) => {
736                CallArgs::JsonArgs(Some(raw.clone()))
737            }
738            crate::api_request::types::Payload::RawPayload(raw) => {
739                CallArgs::JsonArgs(Some(raw.clone()))
740            }
741            crate::api_request::types::Payload::ProcessedUrlEncoded { params, .. } => {
742                let rpc_params: std::collections::HashMap<CompactString, RpcParamValue> = params
743                    .iter()
744                    .map(|(k, v)| (k.clone(), RpcParamValue::Fixed(v.clone())))
745                    .collect();
746                CallArgs::DirectArgs(rpc_params)
747            }
748        }
749    } else {
750        CallArgs::JsonArgs(None)
751    };
752
753    Ok(CallPlan {
754        qi: proc.qi(),
755        params,
756        args,
757        scalar: proc.returns_scalar(),
758        set_of_scalar: proc.returns_set_of_scalar(),
759        filter_fields: qp.filter_fields.iter().cloned().collect(),
760        returning: vec![],
761    })
762}
763
764// ==========================================================================
765// Cast validation
766// ==========================================================================
767
768/// Validate a cast type name
769///
770/// Checks that the cast type has valid syntax (alphanumeric, underscores, spaces).
771/// The actual type existence will be validated by PostgreSQL.
772fn validate_cast_type(cast_type: &str) -> Result<(), Error> {
773    let cast_type = cast_type.trim();
774
775    // Empty cast type is invalid
776    if cast_type.is_empty() {
777        return Err(Error::InvalidQueryParam {
778            param: "select".to_string(),
779            message: "empty cast type".to_string(),
780        });
781    }
782
783    // Check for valid characters: alphanumeric, underscore, space, parentheses, brackets
784    // PostgreSQL allows types like "character varying", "int4", "text[]", "numeric(10,2)"
785    let is_valid = cast_type.chars().all(|c| {
786        c.is_alphanumeric()
787            || c == '_'
788            || c == ' '
789            || c == '('
790            || c == ')'
791            || c == '['
792            || c == ']'
793            || c == ','
794    });
795
796    if !is_valid {
797        return Err(Error::InvalidQueryParam {
798            param: "select".to_string(),
799            message: format!("invalid cast type: {}", cast_type),
800        });
801    }
802
803    Ok(())
804}
805
806// ==========================================================================
807// Resolution helpers
808// ==========================================================================
809
810/// Expand `*` (full_row) select fields into individual column selects.
811///
812/// Backends like SQLite cannot aggregate a row alias (e.g. `json_agg(alias)`)
813/// and need explicit column names. By expanding `*` here, the downstream
814/// SQL generation always has concrete column names available.
815fn expand_star_select(select: &mut Vec<CoercibleSelectField>, table: &Table) {
816    let is_star = |sf: &CoercibleSelectField| sf.field.full_row || sf.field.name.as_str() == "*";
817    let needs_expand = select.is_empty() || select.iter().any(is_star);
818    if !needs_expand {
819        return;
820    }
821
822    let mut expanded = Vec::with_capacity(table.column_count());
823    for sf in select.drain(..) {
824        if is_star(&sf) {
825            expand_all_columns(&mut expanded, table);
826        } else {
827            expanded.push(sf);
828        }
829    }
830    // If select was empty, expand to all columns
831    if expanded.is_empty() {
832        expand_all_columns(&mut expanded, table);
833    }
834    *select = expanded;
835}
836
837fn expand_all_columns(out: &mut Vec<CoercibleSelectField>, table: &Table) {
838    for (col_name, col) in table.columns.iter() {
839        out.push(CoercibleSelectField {
840            field: CoercibleField::from_column(
841                col_name.clone(),
842                smallvec::SmallVec::new(),
843                col.data_type.clone(),
844            )
845            .with_to_json(Some(col)),
846            agg_function: None,
847            agg_cast: None,
848            cast: None,
849            alias: None,
850        });
851    }
852}
853
854/// Resolve select items into coercible select fields
855///
856/// When a `table` is provided, fields are resolved against the table's columns
857/// and computed fields so that `base_type` is set for proper type casting.
858fn resolve_select(
859    items: &[SelectItem],
860    table: Option<&Table>,
861) -> Result<Vec<CoercibleSelectField>, Error> {
862    let mut result = Vec::new();
863
864    for item in items {
865        match item {
866            SelectItem::Field {
867                field,
868                alias,
869                cast,
870                aggregate,
871                aggregate_cast,
872            } => {
873                let resolved_field = if let Some(t) = table {
874                    // Check regular columns first
875                    if let Some(col) = t.get_column(&field.0) {
876                        CoercibleField::from_column(
877                            field.0.clone(),
878                            field.1.clone(),
879                            col.data_type.clone(),
880                        )
881                        .with_to_json(Some(col))
882                    } else if let Some(computed) = t.get_computed_field(&field.0) {
883                        // Check computed fields
884                        CoercibleField::from_computed_field(
885                            field.0.clone(),
886                            field.1.clone(),
887                            computed.function.clone(),
888                            computed.return_type.clone(),
889                        )
890                        // Computed fields don't need to_jsonb wrapper
891                    } else {
892                        // Column/computed field not found - return error instead of creating unknown field
893                        // Exception: allow "*" wildcard for select-all
894                        if field.0.as_str() == "*" {
895                            CoercibleField::unknown(field.0.clone(), field.1.clone())
896                                .with_to_json(None)
897                        } else {
898                            return Err(Error::ColumnNotFound {
899                                table: t.qi().to_string(),
900                                column: field.0.to_string(),
901                            });
902                        }
903                    }
904                } else {
905                    CoercibleField::unknown(field.0.clone(), field.1.clone()).with_to_json(None)
906                };
907
908                // Validate cast types if present
909                if let Some(cast_type) = cast {
910                    validate_cast_type(cast_type)?;
911                }
912                if let Some(agg_cast_type) = aggregate_cast {
913                    validate_cast_type(agg_cast_type)?;
914                }
915
916                result.push(CoercibleSelectField {
917                    field: resolved_field,
918                    agg_function: *aggregate,
919                    agg_cast: aggregate_cast.clone(),
920                    cast: cast.clone(),
921                    alias: alias.clone(),
922                });
923            }
924            _ => {
925                // Relations/Spreads are handled as children - skip here
926            }
927        }
928    }
929
930    Ok(result)
931}
932
933/// Resolve filters into coercible logic trees.
934///
935/// When a `table` is provided, each filter field is resolved against the
936/// table's columns so that `base_type` is set. This allows the query builder
937/// to emit explicit `::type` casts on bind-parameter placeholders.
938/// Also checks for computed fields if the column is not found.
939fn resolve_filters(
940    filters: &[crate::api_request::types::Filter],
941    table: Option<&Table>,
942) -> Result<Vec<CoercibleLogicTree>, Error> {
943    filters
944        .iter()
945        .map(|f| {
946            let field = if let Some(t) = table {
947                // Check regular columns first
948                if let Some(col) = t.get_column(&f.field.0) {
949                    let mut field = CoercibleField::from_column(
950                        f.field.0.clone(),
951                        f.field.1.clone(),
952                        col.data_type.clone(),
953                    );
954                    // Trace: check if column is composite/array and JSON path is present
955                    if !f.field.1.is_empty() {
956                        tracing::trace!(
957                            "Filter field '{}' has JSON path: {:?}, is_composite: {}, is_array: {}",
958                            f.field.0,
959                            f.field.1,
960                            col.is_composite_type(),
961                            col.is_array_type()
962                        );
963                    }
964                    field = field.with_to_json(Some(col));
965                    field
966                } else if let Some(computed) = t.get_computed_field(&f.field.0) {
967                    // Check computed fields
968                    CoercibleField::from_computed_field(
969                        f.field.0.clone(),
970                        f.field.1.clone(),
971                        computed.function.clone(),
972                        computed.return_type.clone(),
973                    )
974                    // Computed fields don't need to_jsonb wrapper
975                } else {
976                    // Column not found - return error instead of creating unknown field
977                    return Err(Error::ColumnNotFound {
978                        table: t.qi().to_string(),
979                        column: f.field.0.to_string(),
980                    });
981                }
982            } else {
983                // No table provided - allow unknown fields (for unit tests, etc.)
984                CoercibleField::unknown(f.field.0.clone(), f.field.1.clone()).with_to_json(None)
985            };
986            Ok(CoercibleLogicTree::Stmnt(CoercibleFilter::Filter {
987                field,
988                op_expr: f.op_expr.clone(),
989            }))
990        })
991        .collect()
992}
993
994/// Resolve order parameters into coercible order terms
995///
996/// When a `table` is provided, order fields are resolved against the table's columns
997/// and computed fields so that `base_type` is set for proper type casting.
998fn resolve_orders(
999    orders: &[(Vec<CompactString>, Vec<OrderTerm>)],
1000    table: Option<&Table>,
1001) -> Vec<CoercibleOrderTerm> {
1002    // Only include root-level orders (empty embed path)
1003    orders
1004        .iter()
1005        .filter(|(path, _)| path.is_empty())
1006        .flat_map(|(_, terms)| resolve_order_terms(terms, table))
1007        .collect()
1008}
1009
1010/// Resolve order terms into coercible order terms
1011fn resolve_order_terms(terms: &[OrderTerm], table: Option<&Table>) -> Vec<CoercibleOrderTerm> {
1012    terms
1013        .iter()
1014        .map(|t| match t {
1015            OrderTerm::Term {
1016                field,
1017                direction,
1018                nulls,
1019            } => {
1020                let resolved_field = if let Some(t) = table {
1021                    // Check regular columns first
1022                    if let Some(col) = t.get_column(&field.0) {
1023                        CoercibleField::from_column(
1024                            field.0.clone(),
1025                            field.1.clone(),
1026                            col.data_type.clone(),
1027                        )
1028                        .with_to_json(Some(col))
1029                    } else if let Some(computed) = t.get_computed_field(&field.0) {
1030                        // Check computed fields
1031                        CoercibleField::from_computed_field(
1032                            field.0.clone(),
1033                            field.1.clone(),
1034                            computed.function.clone(),
1035                            computed.return_type.clone(),
1036                        )
1037                        // Computed fields don't need to_jsonb wrapper
1038                    } else {
1039                        CoercibleField::unknown(field.0.clone(), field.1.clone()).with_to_json(None)
1040                    }
1041                } else {
1042                    CoercibleField::unknown(field.0.clone(), field.1.clone()).with_to_json(None)
1043                };
1044                CoercibleOrderTerm::Term {
1045                    field: resolved_field,
1046                    direction: *direction,
1047                    nulls: *nulls,
1048                }
1049            }
1050            OrderTerm::RelationTerm {
1051                relation,
1052                field,
1053                direction,
1054                nulls,
1055            } => CoercibleOrderTerm::RelationTerm {
1056                relation: relation.clone(),
1057                rel_term: CoercibleField::unknown(field.0.clone(), field.1.clone())
1058                    .with_to_json(None),
1059                direction: *direction,
1060                nulls: *nulls,
1061            },
1062        })
1063        .collect()
1064}
1065
1066/// Resolve mutation columns from the table and payload columns
1067fn resolve_mutation_columns(
1068    table: &Table,
1069    payload_cols: &std::collections::HashSet<CompactString>,
1070) -> Vec<CoercibleField> {
1071    if payload_cols.is_empty() {
1072        // No &columns specified — use all table columns
1073        table
1074            .columns_list()
1075            .map(|col| {
1076                CoercibleField::from_column(
1077                    col.name.clone(),
1078                    Default::default(),
1079                    col.data_type.clone(),
1080                )
1081            })
1082            .collect()
1083    } else {
1084        // Use only the specified columns
1085        payload_cols
1086            .iter()
1087            .filter_map(|col_name| {
1088                table.get_column(col_name).map(|col| {
1089                    CoercibleField::from_column(
1090                        col.name.clone(),
1091                        Default::default(),
1092                        col.data_type.clone(),
1093                    )
1094                })
1095            })
1096            .collect()
1097    }
1098}
1099
1100/// Resolve RETURNING columns based on Prefer header
1101fn resolve_returning(table: &Table, prefs: &Preferences) -> Vec<CoercibleSelectField> {
1102    match prefs.representation {
1103        Some(PreferRepresentation::Full) | Some(PreferRepresentation::HeadersOnly) => {
1104            // Return all columns
1105            table
1106                .columns_list()
1107                .map(|col| CoercibleSelectField {
1108                    field: CoercibleField::from_column(
1109                        col.name.clone(),
1110                        Default::default(),
1111                        col.data_type.clone(),
1112                    ),
1113                    agg_function: None,
1114                    agg_cast: None,
1115                    cast: None,
1116                    alias: None,
1117                })
1118                .collect()
1119        }
1120        _ => vec![],
1121    }
1122}
1123
1124/// Restrict range based on max rows config
1125fn restrict_range(tree: &mut ReadPlanTree, max_rows: i64) {
1126    let plan = &mut tree.node;
1127    if plan.range.is_all() && plan.depth == 0 {
1128        plan.range.limit_to = Some(max_rows);
1129    }
1130}
1131
1132/// Resolve transaction mode from config and preferences
1133fn resolve_tx_mode(config: &AppConfig, prefs: &Preferences, is_read: bool) -> TxMode {
1134    let rollback = if config.db_tx_rollback_all {
1135        true
1136    } else if config.db_tx_allow_override {
1137        matches!(prefs.transaction, Some(PreferTransaction::Rollback))
1138    } else {
1139        false
1140    };
1141
1142    // Select isolation level based on operation type
1143    // Read operations use db_tx_read_isolation, write operations use db_tx_write_isolation
1144    let isolation_level = if is_read {
1145        config.db_tx_read_isolation
1146    } else {
1147        config.db_tx_write_isolation
1148    };
1149
1150    TxMode {
1151        isolation_level,
1152        rollback,
1153    }
1154}
1155
1156// ==========================================================================
1157// Tests
1158// ==========================================================================
1159
1160#[cfg(test)]
1161mod tests {
1162    use super::*;
1163    use crate::test_helpers::*;
1164    use std::collections::HashMap;
1165    use std::sync::Arc;
1166
1167    fn test_config() -> AppConfig {
1168        AppConfig {
1169            db_schemas: vec!["public".to_string()],
1170            ..Default::default()
1171        }
1172    }
1173
1174    fn test_schema_cache() -> SchemaCache {
1175        let mut tables = HashMap::new();
1176
1177        let users_table = test_table()
1178            .schema("public")
1179            .name("users")
1180            .pk_col("id")
1181            .column(
1182                test_column()
1183                    .name("id")
1184                    .data_type("integer")
1185                    .nullable(false)
1186                    .build(),
1187            )
1188            .column(test_column().name("name").data_type("text").build())
1189            .column(test_column().name("email").data_type("text").build())
1190            .build();
1191
1192        let posts_table = test_table()
1193            .schema("public")
1194            .name("posts")
1195            .pk_col("id")
1196            .column(
1197                test_column()
1198                    .name("id")
1199                    .data_type("integer")
1200                    .nullable(false)
1201                    .build(),
1202            )
1203            .column(test_column().name("user_id").data_type("integer").build())
1204            .column(test_column().name("title").data_type("text").build())
1205            .build();
1206
1207        tables.insert(users_table.qi(), users_table);
1208        tables.insert(posts_table.qi(), posts_table);
1209
1210        let rel = test_relationship()
1211            .table("public", "posts")
1212            .foreign_table("public", "users")
1213            .m2o("fk_posts_user", &[("user_id", "id")])
1214            .build();
1215
1216        let mut relationships = HashMap::new();
1217        let key = (
1218            QualifiedIdentifier::new("public", "posts"),
1219            "public".to_string(),
1220        );
1221        relationships.insert(key, vec![AnyRelationship::ForeignKey(rel)]);
1222
1223        // Also add reverse relationship (users -> posts as O2M)
1224        let rev_rel = test_relationship()
1225            .table("public", "users")
1226            .foreign_table("public", "posts")
1227            .o2m("fk_posts_user", &[("id", "user_id")])
1228            .build();
1229        let rev_key = (
1230            QualifiedIdentifier::new("public", "users"),
1231            "public".to_string(),
1232        );
1233        relationships.insert(rev_key, vec![AnyRelationship::ForeignKey(rev_rel)]);
1234
1235        let routine = test_routine()
1236            .schema("public")
1237            .name("get_user")
1238            .param(test_param().name("user_id").pg_type("integer").build())
1239            .returns_setof_composite("public", "users")
1240            .build();
1241
1242        let mut routines = HashMap::new();
1243        routines.insert(routine.qi(), vec![routine]);
1244
1245        SchemaCache {
1246            tables: Arc::new(tables),
1247            relationships: Arc::new(relationships),
1248            routines: Arc::new(routines),
1249            timezones: Arc::new(std::collections::HashSet::new()),
1250            representations: Arc::new(HashMap::new()),
1251            media_handlers: Arc::new(HashMap::new()),
1252        }
1253    }
1254
1255    #[test]
1256    fn test_find_table_exists() {
1257        let cache = test_schema_cache();
1258        let qi = QualifiedIdentifier::new("public", "users");
1259        let table = find_table(&cache, &qi);
1260        assert!(table.is_ok());
1261        assert_eq!(table.unwrap().name.as_str(), "users");
1262    }
1263
1264    #[test]
1265    fn test_find_table_not_found() {
1266        let cache = test_schema_cache();
1267        let qi = QualifiedIdentifier::new("public", "nonexistent");
1268        let result = find_table(&cache, &qi);
1269        assert!(result.is_err());
1270        assert!(matches!(result.unwrap_err(), Error::TableNotFound { .. }));
1271    }
1272
1273    #[test]
1274    fn test_find_proc_exists() {
1275        let cache = test_schema_cache();
1276        let qi = QualifiedIdentifier::new("public", "get_user");
1277        let proc = find_proc(&cache, &qi);
1278        assert!(proc.is_ok());
1279    }
1280
1281    #[test]
1282    fn test_find_proc_not_found() {
1283        let cache = test_schema_cache();
1284        let qi = QualifiedIdentifier::new("public", "nonexistent_func");
1285        let result = find_proc(&cache, &qi);
1286        assert!(result.is_err());
1287        assert!(matches!(
1288            result.unwrap_err(),
1289            Error::FunctionNotFound { .. }
1290        ));
1291    }
1292
1293    #[test]
1294    fn test_find_rels() {
1295        let cache = test_schema_cache();
1296        let source = QualifiedIdentifier::new("public", "users");
1297        let rels = find_rels(&cache, &source, "posts");
1298        assert_eq!(rels.len(), 1);
1299    }
1300
1301    #[test]
1302    fn test_resolve_select_fields() {
1303        use smallvec::SmallVec;
1304        let items = vec![
1305            SelectItem::Field {
1306                field: ("id".into(), SmallVec::new()),
1307                alias: None,
1308                cast: None,
1309                aggregate: None,
1310                aggregate_cast: None,
1311            },
1312            SelectItem::Field {
1313                field: ("name".into(), SmallVec::new()),
1314                alias: Some("user_name".into()),
1315                cast: Some("text".into()),
1316                aggregate: None,
1317                aggregate_cast: None,
1318            },
1319        ];
1320
1321        let resolved = resolve_select(&items, None).unwrap();
1322        assert_eq!(resolved.len(), 2);
1323        assert_eq!(resolved[0].field.name.as_str(), "id");
1324        assert!(resolved[0].alias.is_none());
1325        assert_eq!(resolved[1].field.name.as_str(), "name");
1326        assert_eq!(resolved[1].alias.as_deref(), Some("user_name"));
1327        assert_eq!(resolved[1].cast.as_deref(), Some("text"));
1328    }
1329
1330    #[test]
1331    fn test_resolve_filters() {
1332        use crate::api_request::types::{Filter, OpExpr, Operation, QuantOperator};
1333        use smallvec::SmallVec;
1334
1335        let filters = vec![Filter {
1336            field: ("id".into(), SmallVec::new()),
1337            op_expr: OpExpr::Expr {
1338                negated: false,
1339                operation: Operation::Quant(QuantOperator::Equal, None, "5".into()),
1340            },
1341        }];
1342
1343        let resolved = resolve_filters(&filters, None).unwrap();
1344        assert_eq!(resolved.len(), 1);
1345        assert!(matches!(resolved[0], CoercibleLogicTree::Stmnt(_)));
1346    }
1347
1348    #[test]
1349    fn test_resolve_filters_with_computed_field() {
1350        use crate::api_request::types::{Filter, OpExpr, Operation, QuantOperator};
1351        use crate::schema_cache::table::ComputedField;
1352        use crate::types::QualifiedIdentifier;
1353        use smallvec::SmallVec;
1354
1355        // Create a table with a computed field
1356        let mut table = test_table()
1357            .schema("test_api")
1358            .name("users")
1359            .column(test_column().name("id").data_type("integer").build())
1360            .build();
1361
1362        let func_qi = QualifiedIdentifier::new("test_api", "full_name");
1363        let computed = ComputedField {
1364            function: func_qi,
1365            return_type: "text".into(),
1366            returns_set: false,
1367        };
1368        table.computed_fields.insert("full_name".into(), computed);
1369
1370        let filters = vec![Filter {
1371            field: ("full_name".into(), SmallVec::new()),
1372            op_expr: OpExpr::Expr {
1373                negated: false,
1374                operation: Operation::Quant(QuantOperator::Equal, None, "John Doe".into()),
1375            },
1376        }];
1377
1378        let resolved = resolve_filters(&filters, Some(&table)).unwrap();
1379        assert_eq!(resolved.len(), 1);
1380
1381        if let CoercibleLogicTree::Stmnt(CoercibleFilter::Filter { field, .. }) = &resolved[0] {
1382            assert!(field.is_computed);
1383            assert_eq!(field.name.as_str(), "full_name");
1384            assert_eq!(field.base_type.as_deref(), Some("text"));
1385        } else {
1386            panic!("Expected Filter variant");
1387        }
1388    }
1389
1390    #[test]
1391    fn test_resolve_select_with_computed_field() {
1392        use crate::api_request::types::SelectItem;
1393        use crate::schema_cache::table::ComputedField;
1394        use crate::types::QualifiedIdentifier;
1395
1396        // Create a table with a computed field
1397        let mut table = test_table()
1398            .schema("test_api")
1399            .name("users")
1400            .column(test_column().name("id").data_type("integer").build())
1401            .column(test_column().name("name").data_type("text").build())
1402            .build();
1403
1404        let func_qi = QualifiedIdentifier::new("test_api", "full_name");
1405        let computed = ComputedField {
1406            function: func_qi,
1407            return_type: "text".into(),
1408            returns_set: false,
1409        };
1410        table.computed_fields.insert("full_name".into(), computed);
1411
1412        let items = vec![
1413            SelectItem::Field {
1414                field: ("id".into(), Default::default()),
1415                alias: None,
1416                cast: None,
1417                aggregate: None,
1418                aggregate_cast: None,
1419            },
1420            SelectItem::Field {
1421                field: ("full_name".into(), Default::default()),
1422                alias: None,
1423                cast: None,
1424                aggregate: None,
1425                aggregate_cast: None,
1426            },
1427        ];
1428
1429        let resolved = resolve_select(&items, Some(&table)).unwrap();
1430        assert_eq!(resolved.len(), 2);
1431
1432        // First field (id) should be a regular column
1433        assert!(!resolved[0].field.is_computed);
1434        assert_eq!(resolved[0].field.name.as_str(), "id");
1435
1436        // Second field (full_name) should be a computed field
1437        assert!(resolved[1].field.is_computed);
1438        assert_eq!(resolved[1].field.name.as_str(), "full_name");
1439        assert_eq!(resolved[1].field.base_type.as_deref(), Some("text"));
1440    }
1441
1442    #[test]
1443    fn test_resolve_select_computed_field_with_cast() {
1444        use crate::api_request::types::SelectItem;
1445        use crate::schema_cache::table::ComputedField;
1446        use crate::types::QualifiedIdentifier;
1447
1448        let mut table = test_table()
1449            .schema("test_api")
1450            .name("users")
1451            .column(test_column().name("id").data_type("integer").build())
1452            .build();
1453
1454        let func_qi = QualifiedIdentifier::new("test_api", "full_name");
1455        let computed = ComputedField {
1456            function: func_qi,
1457            return_type: "text".into(),
1458            returns_set: false,
1459        };
1460        table.computed_fields.insert("full_name".into(), computed);
1461
1462        let items = vec![SelectItem::Field {
1463            field: ("full_name".into(), Default::default()),
1464            alias: None,
1465            cast: Some("varchar".into()),
1466            aggregate: None,
1467            aggregate_cast: None,
1468        }];
1469
1470        let resolved = resolve_select(&items, Some(&table)).unwrap();
1471        assert_eq!(resolved.len(), 1);
1472        assert!(resolved[0].field.is_computed);
1473        assert_eq!(resolved[0].cast.as_deref(), Some("varchar"));
1474    }
1475
1476    #[test]
1477    fn test_resolve_order_with_computed_field() {
1478        use crate::api_request::types::OrderTerm;
1479        use crate::schema_cache::table::ComputedField;
1480        use crate::types::QualifiedIdentifier;
1481
1482        let mut table = test_table()
1483            .schema("test_api")
1484            .name("users")
1485            .column(test_column().name("id").data_type("integer").build())
1486            .build();
1487
1488        let func_qi = QualifiedIdentifier::new("test_api", "full_name");
1489        let computed = ComputedField {
1490            function: func_qi,
1491            return_type: "text".into(),
1492            returns_set: false,
1493        };
1494        table.computed_fields.insert("full_name".into(), computed);
1495
1496        let terms = vec![OrderTerm::Term {
1497            field: ("full_name".into(), Default::default()),
1498            direction: Some(crate::api_request::types::OrderDirection::Asc),
1499            nulls: None,
1500        }];
1501
1502        let resolved = resolve_order_terms(&terms, Some(&table));
1503        assert_eq!(resolved.len(), 1);
1504
1505        if let crate::plan::types::CoercibleOrderTerm::Term { field, .. } = &resolved[0] {
1506            assert!(field.is_computed);
1507            assert_eq!(field.name.as_str(), "full_name");
1508            assert_eq!(field.base_type.as_deref(), Some("text"));
1509        } else {
1510            panic!("Expected Term variant");
1511        }
1512    }
1513
1514    #[test]
1515    fn test_resolve_filters_with_computed_field_operators() {
1516        use crate::api_request::types::{Filter, OpExpr, Operation, QuantOperator};
1517        use crate::schema_cache::table::ComputedField;
1518        use crate::types::QualifiedIdentifier;
1519        use smallvec::SmallVec;
1520
1521        let mut table = test_table()
1522            .schema("test_api")
1523            .name("users")
1524            .column(test_column().name("id").data_type("integer").build())
1525            .build();
1526
1527        let func_qi = QualifiedIdentifier::new("test_api", "full_name");
1528        let computed = ComputedField {
1529            function: func_qi,
1530            return_type: "text".into(),
1531            returns_set: false,
1532        };
1533        table.computed_fields.insert("full_name".into(), computed);
1534
1535        // Test LIKE operator
1536        let filters = vec![Filter {
1537            field: ("full_name".into(), SmallVec::new()),
1538            op_expr: OpExpr::Expr {
1539                negated: false,
1540                operation: Operation::Quant(QuantOperator::Like, None, "John*".into()),
1541            },
1542        }];
1543
1544        let resolved = resolve_filters(&filters, Some(&table)).unwrap();
1545        assert_eq!(resolved.len(), 1);
1546
1547        if let crate::plan::types::CoercibleLogicTree::Stmnt(
1548            crate::plan::types::CoercibleFilter::Filter { field, .. },
1549        ) = &resolved[0]
1550        {
1551            assert!(field.is_computed);
1552            assert_eq!(field.name.as_str(), "full_name");
1553        } else {
1554            panic!("Expected Filter variant");
1555        }
1556    }
1557
1558    #[test]
1559    fn test_resolve_filters_computed_field_vs_column() {
1560        use crate::api_request::types::{Filter, OpExpr, Operation, QuantOperator};
1561        use crate::schema_cache::table::ComputedField;
1562        use crate::types::QualifiedIdentifier;
1563        use smallvec::SmallVec;
1564
1565        let mut table = test_table()
1566            .schema("test_api")
1567            .name("users")
1568            .column(test_column().name("name").data_type("text").build())
1569            .build();
1570
1571        let func_qi = QualifiedIdentifier::new("test_api", "full_name");
1572        let computed = ComputedField {
1573            function: func_qi,
1574            return_type: "text".into(),
1575            returns_set: false,
1576        };
1577        table.computed_fields.insert("full_name".into(), computed);
1578
1579        // Filter by regular column
1580        let filters1 = vec![Filter {
1581            field: ("name".into(), SmallVec::new()),
1582            op_expr: OpExpr::Expr {
1583                negated: false,
1584                operation: Operation::Quant(QuantOperator::Equal, None, "John".into()),
1585            },
1586        }];
1587        let resolved1 = resolve_filters(&filters1, Some(&table)).unwrap();
1588        assert_eq!(resolved1.len(), 1);
1589        if let crate::plan::types::CoercibleLogicTree::Stmnt(
1590            crate::plan::types::CoercibleFilter::Filter { field, .. },
1591        ) = &resolved1[0]
1592        {
1593            assert!(!field.is_computed);
1594            assert_eq!(field.name.as_str(), "name");
1595        }
1596
1597        // Filter by computed field
1598        let filters2 = vec![Filter {
1599            field: ("full_name".into(), SmallVec::new()),
1600            op_expr: OpExpr::Expr {
1601                negated: false,
1602                operation: Operation::Quant(QuantOperator::Equal, None, "John Doe".into()),
1603            },
1604        }];
1605        let resolved2 = resolve_filters(&filters2, Some(&table)).unwrap();
1606        assert_eq!(resolved2.len(), 1);
1607        if let crate::plan::types::CoercibleLogicTree::Stmnt(
1608            crate::plan::types::CoercibleFilter::Filter { field, .. },
1609        ) = &resolved2[0]
1610        {
1611            assert!(field.is_computed);
1612            assert_eq!(field.name.as_str(), "full_name");
1613        }
1614    }
1615
1616    #[test]
1617    fn test_action_plan_relation_read() {
1618        let config = test_config();
1619        let cache = test_schema_cache();
1620        let prefs = Preferences::default();
1621        let body = bytes::Bytes::new();
1622
1623        let api_req = crate::api_request::from_request(
1624            &config,
1625            &prefs,
1626            "GET",
1627            "/users",
1628            "select=id,name",
1629            &[("accept".to_string(), "application/json".to_string())],
1630            body,
1631        )
1632        .unwrap();
1633
1634        let plan = action_plan(&config, &api_req, &cache).unwrap();
1635        assert!(matches!(
1636            plan,
1637            ActionPlan::Db(DbActionPlan::DbCrud {
1638                plan: CrudPlan::WrappedReadPlan { .. },
1639                ..
1640            })
1641        ));
1642    }
1643
1644    #[test]
1645    fn test_action_plan_relation_delete() {
1646        let config = test_config();
1647        let cache = test_schema_cache();
1648        let prefs = Preferences::default();
1649        let body = bytes::Bytes::new();
1650
1651        let api_req = crate::api_request::from_request(
1652            &config,
1653            &prefs,
1654            "DELETE",
1655            "/users",
1656            "id=eq.1",
1657            &[],
1658            body,
1659        )
1660        .unwrap();
1661
1662        let plan = action_plan(&config, &api_req, &cache).unwrap();
1663        assert!(matches!(
1664            plan,
1665            ActionPlan::Db(DbActionPlan::DbCrud {
1666                plan: CrudPlan::MutateReadPlan {
1667                    mutation: Mutation::MutationDelete,
1668                    ..
1669                },
1670                ..
1671            })
1672        ));
1673    }
1674
1675    #[test]
1676    fn test_action_plan_schema_info() {
1677        let config = test_config();
1678        let cache = test_schema_cache();
1679        let prefs = Preferences::default();
1680        let body = bytes::Bytes::new();
1681
1682        let api_req =
1683            crate::api_request::from_request(&config, &prefs, "OPTIONS", "/", "", &[], body)
1684                .unwrap();
1685
1686        let plan = action_plan(&config, &api_req, &cache).unwrap();
1687        assert!(matches!(plan, ActionPlan::NoDb(InfoPlan::SchemaInfoPlan)));
1688    }
1689
1690    #[test]
1691    fn test_action_plan_relation_info() {
1692        let config = test_config();
1693        let cache = test_schema_cache();
1694        let prefs = Preferences::default();
1695        let body = bytes::Bytes::new();
1696
1697        let api_req =
1698            crate::api_request::from_request(&config, &prefs, "OPTIONS", "/users", "", &[], body)
1699                .unwrap();
1700
1701        let plan = action_plan(&config, &api_req, &cache).unwrap();
1702        assert!(matches!(plan, ActionPlan::NoDb(InfoPlan::RelInfoPlan(_))));
1703    }
1704
1705    #[test]
1706    fn test_action_plan_with_embed() {
1707        let config = test_config();
1708        let cache = test_schema_cache();
1709        let prefs = Preferences::default();
1710        let body = bytes::Bytes::new();
1711
1712        let api_req = crate::api_request::from_request(
1713            &config,
1714            &prefs,
1715            "GET",
1716            "/users",
1717            "select=id,name,posts(id,title)",
1718            &[("accept".to_string(), "application/json".to_string())],
1719            body,
1720        )
1721        .unwrap();
1722
1723        let plan = action_plan(&config, &api_req, &cache).unwrap();
1724        if let ActionPlan::Db(DbActionPlan::DbCrud {
1725            plan: CrudPlan::WrappedReadPlan { read_plan, .. },
1726            ..
1727        }) = plan
1728        {
1729            assert_eq!(read_plan.node_count(), 2); // root + posts child
1730            assert_eq!(read_plan.children().len(), 1);
1731            assert_eq!(read_plan.children()[0].node.rel_name.as_str(), "posts");
1732        } else {
1733            panic!("Expected WrappedReadPlan");
1734        }
1735    }
1736
1737    #[test]
1738    fn test_action_plan_rpc() {
1739        let config = test_config();
1740        let cache = test_schema_cache();
1741        let prefs = Preferences::default();
1742        let body = bytes::Bytes::new();
1743
1744        let api_req = crate::api_request::from_request(
1745            &config,
1746            &prefs,
1747            "GET",
1748            "/rpc/get_user",
1749            "user_id=1",
1750            &[("accept".to_string(), "application/json".to_string())],
1751            body,
1752        )
1753        .unwrap();
1754
1755        let plan = action_plan(&config, &api_req, &cache).unwrap();
1756        assert!(matches!(
1757            plan,
1758            ActionPlan::Db(DbActionPlan::DbCrud {
1759                plan: CrudPlan::CallReadPlan { .. },
1760                ..
1761            })
1762        ));
1763    }
1764
1765    #[test]
1766    fn test_action_plan_table_not_found() {
1767        let config = test_config();
1768        let cache = test_schema_cache();
1769        let prefs = Preferences::default();
1770        let body = bytes::Bytes::new();
1771
1772        let api_req =
1773            crate::api_request::from_request(&config, &prefs, "GET", "/nonexistent", "", &[], body)
1774                .unwrap();
1775
1776        let result = action_plan(&config, &api_req, &cache);
1777        assert!(result.is_err());
1778        assert!(matches!(result.unwrap_err(), Error::TableNotFound { .. }));
1779    }
1780
1781    #[test]
1782    fn test_tx_mode_default() {
1783        let config = test_config();
1784        let prefs = Preferences::default();
1785        let tx = resolve_tx_mode(&config, &prefs, true);
1786        assert!(!tx.rollback);
1787    }
1788
1789    #[test]
1790    fn test_tx_mode_rollback_all() {
1791        let mut config = test_config();
1792        config.db_tx_rollback_all = true;
1793        let prefs = Preferences::default();
1794        let tx = resolve_tx_mode(&config, &prefs, false);
1795        assert!(tx.rollback);
1796    }
1797
1798    #[test]
1799    fn test_resolve_mutation_columns_all() {
1800        let table = test_table()
1801            .column(test_column().name("id").data_type("integer").build())
1802            .column(test_column().name("name").data_type("text").build())
1803            .build();
1804
1805        let cols = resolve_mutation_columns(&table, &std::collections::HashSet::new());
1806        assert_eq!(cols.len(), 2);
1807    }
1808
1809    #[test]
1810    fn test_resolve_mutation_columns_subset() {
1811        let table = test_table()
1812            .column(test_column().name("id").data_type("integer").build())
1813            .column(test_column().name("name").data_type("text").build())
1814            .column(test_column().name("email").data_type("text").build())
1815            .build();
1816
1817        let mut payload_cols = std::collections::HashSet::new();
1818        payload_cols.insert(CompactString::from("name"));
1819        payload_cols.insert(CompactString::from("email"));
1820
1821        let cols = resolve_mutation_columns(&table, &payload_cols);
1822        assert_eq!(cols.len(), 2);
1823    }
1824
1825    #[test]
1826    fn test_resolve_select_composite_type() {
1827        use crate::api_request::types::SelectItem;
1828        use crate::api_request::types::{JsonOperand, JsonOperation, JsonPath};
1829        use smallvec::SmallVec;
1830
1831        let mut table = test_table()
1832            .schema("test_api")
1833            .name("countries")
1834            .column(
1835                test_column()
1836                    .name("location")
1837                    .data_type("test_api.coordinates")
1838                    .build(),
1839            )
1840            .build();
1841
1842        // Mark column as composite
1843        {
1844            use indexmap::IndexMap;
1845            use std::sync::Arc;
1846            let mut new_columns = IndexMap::new();
1847            for (k, v) in table.columns.iter() {
1848                if k.as_str() == "location" {
1849                    let mut new_col = v.clone();
1850                    new_col.is_composite = true;
1851                    new_col.composite_type_schema = Some("test_api".into());
1852                    new_col.composite_type_name = Some("coordinates".into());
1853                    new_columns.insert(k.clone(), new_col);
1854                } else {
1855                    new_columns.insert(k.clone(), v.clone());
1856                }
1857            }
1858            table.columns = Arc::new(new_columns);
1859        }
1860
1861        let mut json_path: JsonPath = SmallVec::new();
1862        json_path.push(JsonOperation::Arrow2(JsonOperand::Key("lat".into())));
1863
1864        let items = vec![SelectItem::Field {
1865            field: ("location".into(), json_path),
1866            alias: None,
1867            cast: None,
1868            aggregate: None,
1869            aggregate_cast: None,
1870        }];
1871
1872        let resolved = resolve_select(&items, Some(&table)).unwrap();
1873        assert_eq!(resolved.len(), 1);
1874        assert!(
1875            resolved[0].field.to_json,
1876            "Composite type with JSON path should have to_json=true"
1877        );
1878    }
1879
1880    #[test]
1881    fn test_resolve_select_array_type() {
1882        use crate::api_request::types::SelectItem;
1883        use crate::api_request::types::{JsonOperand, JsonOperation, JsonPath};
1884        use smallvec::SmallVec;
1885
1886        let table = test_table()
1887            .schema("test_api")
1888            .name("countries")
1889            .column(test_column().name("languages").data_type("text[]").build())
1890            .build();
1891
1892        let mut json_path: JsonPath = SmallVec::new();
1893        json_path.push(JsonOperation::Arrow(JsonOperand::Idx("0".into())));
1894
1895        let items = vec![SelectItem::Field {
1896            field: ("languages".into(), json_path),
1897            alias: None,
1898            cast: None,
1899            aggregate: None,
1900            aggregate_cast: None,
1901        }];
1902
1903        let resolved = resolve_select(&items, Some(&table)).unwrap();
1904        assert_eq!(resolved.len(), 1);
1905        assert!(
1906            resolved[0].field.to_json,
1907            "Array type with JSON path should have to_json=true"
1908        );
1909    }
1910
1911    #[test]
1912    fn test_resolve_select_json_type_no_wrapper() {
1913        use crate::api_request::types::SelectItem;
1914        use crate::api_request::types::{JsonOperand, JsonOperation, JsonPath};
1915        use smallvec::SmallVec;
1916
1917        let table = test_table()
1918            .schema("test_api")
1919            .name("posts")
1920            .column(test_column().name("metadata").data_type("jsonb").build())
1921            .build();
1922
1923        let mut json_path: JsonPath = SmallVec::new();
1924        json_path.push(JsonOperation::Arrow2(JsonOperand::Key("title".into())));
1925
1926        let items = vec![SelectItem::Field {
1927            field: ("metadata".into(), json_path),
1928            alias: None,
1929            cast: None,
1930            aggregate: None,
1931            aggregate_cast: None,
1932        }];
1933
1934        let resolved = resolve_select(&items, Some(&table)).unwrap();
1935        assert_eq!(resolved.len(), 1);
1936        assert!(
1937            !resolved[0].field.to_json,
1938            "JSON/JSONB type should have to_json=false"
1939        );
1940    }
1941
1942    #[test]
1943    fn test_resolve_filters_composite_type() {
1944        use crate::api_request::types::{Filter, OpExpr, Operation, QuantOperator};
1945        use crate::api_request::types::{JsonOperand, JsonOperation, JsonPath};
1946        use smallvec::SmallVec;
1947
1948        let mut table = test_table()
1949            .schema("test_api")
1950            .name("countries")
1951            .column(
1952                test_column()
1953                    .name("location")
1954                    .data_type("test_api.coordinates")
1955                    .build(),
1956            )
1957            .build();
1958
1959        // Mark column as composite
1960        {
1961            use indexmap::IndexMap;
1962            use std::sync::Arc;
1963            let mut new_columns = IndexMap::new();
1964            for (k, v) in table.columns.iter() {
1965                if k.as_str() == "location" {
1966                    let mut new_col = v.clone();
1967                    new_col.is_composite = true;
1968                    new_col.composite_type_schema = Some("test_api".into());
1969                    new_col.composite_type_name = Some("coordinates".into());
1970                    new_columns.insert(k.clone(), new_col);
1971                } else {
1972                    new_columns.insert(k.clone(), v.clone());
1973                }
1974            }
1975            table.columns = Arc::new(new_columns);
1976        }
1977
1978        let mut json_path: JsonPath = SmallVec::new();
1979        json_path.push(JsonOperation::Arrow2(JsonOperand::Key("lat".into())));
1980
1981        let filters = vec![Filter {
1982            field: ("location".into(), json_path),
1983            op_expr: OpExpr::Expr {
1984                negated: false,
1985                operation: Operation::Quant(QuantOperator::GreaterThanEqual, None, "19.0".into()),
1986            },
1987        }];
1988
1989        let resolved = resolve_filters(&filters, Some(&table)).unwrap();
1990        assert_eq!(resolved.len(), 1);
1991
1992        if let crate::plan::types::CoercibleLogicTree::Stmnt(
1993            crate::plan::types::CoercibleFilter::Filter { field, .. },
1994        ) = &resolved[0]
1995        {
1996            assert!(
1997                field.to_json,
1998                "Composite type filter with JSON path should have to_json=true"
1999            );
2000        } else {
2001            panic!("Expected Filter variant");
2002        }
2003    }
2004
2005    #[test]
2006    fn test_validate_cast_type() {
2007        // Valid cast types
2008        assert!(validate_cast_type("text").is_ok());
2009        assert!(validate_cast_type("integer").is_ok());
2010        assert!(validate_cast_type("bigint").is_ok());
2011        assert!(validate_cast_type("character varying").is_ok());
2012        assert!(validate_cast_type("numeric(10,2)").is_ok());
2013        assert!(validate_cast_type("text[]").is_ok());
2014        assert!(validate_cast_type("_int4").is_ok());
2015
2016        // Invalid cast types
2017        assert!(validate_cast_type("").is_err());
2018        assert!(validate_cast_type("invalid@type").is_err());
2019        assert!(validate_cast_type("type;drop table").is_err());
2020    }
2021
2022    #[test]
2023    fn test_resolve_filters_rejects_cast() {
2024        // Note: Cast validation now happens in query_params parsing (parse_tree_path),
2025        // not in resolve_filters. This test verifies that resolve_filters correctly
2026        // handles fields that have already been validated.
2027        // If a field with "::" somehow gets through, it would be treated as an unknown
2028        // field name, which would fail column lookup.
2029        use crate::api_request::types::{Filter, OpExpr, Operation, QuantOperator};
2030        use smallvec::SmallVec;
2031
2032        let table = test_table()
2033            .schema("test_api")
2034            .name("users")
2035            .column(test_column().name("id").data_type("integer").build())
2036            .build();
2037
2038        // Even if "::" is in the field name, it won't match "id" column
2039        let filters = vec![Filter {
2040            field: ("id::text".into(), SmallVec::new()),
2041            op_expr: OpExpr::Expr {
2042                negated: false,
2043                operation: Operation::Quant(QuantOperator::Equal, None, "1".into()),
2044            },
2045        }];
2046
2047        let result = resolve_filters(&filters, Some(&table));
2048        // Should fail because "id::text" doesn't match column "id"
2049        assert!(
2050            result.is_err(),
2051            "Should fail when column name doesn't match"
2052        );
2053
2054        if let Err(Error::ColumnNotFound { column, .. }) = result {
2055            assert_eq!(column, "id::text");
2056        } else {
2057            panic!("Expected ColumnNotFound error, got: {:?}", result);
2058        }
2059    }
2060}