Skip to main content

citadel_sql/
parser.rs

1//! SQL parser: converts SQL strings into the internal AST.
2
3use sqlparser::ast as sp;
4
5use crate::error::{Result, SqlError};
6use crate::types::{DataType, Value};
7
8#[derive(Debug, Clone)]
9pub enum Statement {
10    CreateTable(CreateTableStmt),
11    DropTable(DropTableStmt),
12    CreateIndex(CreateIndexStmt),
13    DropIndex(DropIndexStmt),
14    CreateView(CreateViewStmt),
15    DropView(DropViewStmt),
16    CreateMaterializedView(Box<CreateMatviewStmt>),
17    RefreshMaterializedView(RefreshMatviewStmt),
18    DropMaterializedView(DropMatviewStmt),
19    CreateTrigger(Box<CreateTriggerStmt>),
20    DropTrigger(DropTriggerStmt),
21    AlterTable(Box<AlterTableStmt>),
22    Insert(InsertStmt),
23    Select(Box<SelectQuery>),
24    Update(UpdateStmt),
25    Delete(DeleteStmt),
26    Truncate(TruncateStmt),
27    Begin { access_mode: BeginAccessMode },
28    Commit,
29    Rollback,
30    Savepoint(String),
31    ReleaseSavepoint(String),
32    RollbackTo(String),
33    SetTimezone(String),
34    Explain(Box<Statement>),
35}
36
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum BeginAccessMode {
39    Default,
40    ReadWrite,
41    ReadOnly,
42}
43
44#[derive(Debug, Clone)]
45pub struct AlterTableStmt {
46    pub table: String,
47    pub op: AlterTableOp,
48}
49
50#[derive(Debug, Clone)]
51pub enum AlterTableOp {
52    AddColumn {
53        column: Box<ColumnSpec>,
54        foreign_key: Option<ForeignKeyDef>,
55        if_not_exists: bool,
56    },
57    DropColumn {
58        name: String,
59        if_exists: bool,
60    },
61    RenameColumn {
62        old_name: String,
63        new_name: String,
64    },
65    RenameTable {
66        new_name: String,
67    },
68    DisableTrigger {
69        name: String,
70    },
71    EnableTrigger {
72        name: String,
73    },
74    DisableAllTriggers,
75    EnableAllTriggers,
76}
77
78#[derive(Debug, Clone)]
79pub struct CreateTableStmt {
80    pub name: String,
81    pub columns: Vec<ColumnSpec>,
82    pub primary_key: Vec<String>,
83    pub if_not_exists: bool,
84    pub check_constraints: Vec<TableCheckConstraint>,
85    pub foreign_keys: Vec<ForeignKeyDef>,
86    pub unique_indices: Vec<UniqueIndexDef>,
87    pub strict: bool,
88    pub temporary: bool,
89}
90
91#[derive(Debug, Clone)]
92pub struct UniqueIndexDef {
93    pub name: Option<String>,
94    pub columns: Vec<String>,
95}
96
97#[derive(Debug, Clone)]
98pub struct TableCheckConstraint {
99    pub name: Option<String>,
100    pub expr: Expr,
101    pub sql: String,
102}
103
104#[derive(Debug, Clone)]
105pub struct ForeignKeyDef {
106    pub name: Option<String>,
107    pub columns: Vec<String>,
108    pub foreign_table: String,
109    pub referred_columns: Vec<String>,
110    pub on_delete: ReferentialAction,
111    pub on_update: ReferentialAction,
112    pub deferrable: bool,
113    pub initially_deferred: bool,
114}
115
116#[derive(Debug, Clone, Copy, PartialEq, Eq)]
117#[repr(u8)]
118pub enum ReferentialAction {
119    NoAction = 0,
120    Restrict = 1,
121    Cascade = 2,
122    SetNull = 3,
123    SetDefault = 4,
124}
125
126impl ReferentialAction {
127    pub fn from_tag(tag: u8) -> Option<Self> {
128        match tag {
129            0 => Some(Self::NoAction),
130            1 => Some(Self::Restrict),
131            2 => Some(Self::Cascade),
132            3 => Some(Self::SetNull),
133            4 => Some(Self::SetDefault),
134            _ => None,
135        }
136    }
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
140pub enum GeneratedKind {
141    Stored,
142    Virtual,
143}
144
145#[derive(Debug, Clone)]
146pub struct ColumnSpec {
147    pub name: String,
148    pub data_type: DataType,
149    pub nullable: bool,
150    pub is_primary_key: bool,
151    pub default_expr: Option<Expr>,
152    pub default_sql: Option<String>,
153    pub check_expr: Option<Expr>,
154    pub check_sql: Option<String>,
155    pub check_name: Option<String>,
156    pub generated_expr: Option<Expr>,
157    pub generated_sql: Option<String>,
158    pub generated_kind: Option<GeneratedKind>,
159    pub collation: crate::types::Collation,
160}
161
162#[derive(Debug, Clone)]
163pub struct DropTableStmt {
164    pub name: String,
165    pub if_exists: bool,
166}
167
168#[derive(Debug, Clone)]
169pub struct TruncateStmt {
170    pub tables: Vec<String>,
171}
172
173#[derive(Debug, Clone)]
174pub struct CreateIndexStmt {
175    pub index_name: String,
176    pub table_name: String,
177    /// Per-key column name for `IndexColSpec::Column` entries; ignored for `Expr` entries
178    /// (those resolve through `key_exprs[i]`).
179    pub columns: Vec<String>,
180    /// Parallel to `columns`: `None` means the key is a column reference, `Some((expr, sql))`
181    /// means the key is an expression. Empty Vec = all-column index.
182    pub key_exprs: Vec<Option<(Expr, String)>>,
183    pub unique: bool,
184    pub if_not_exists: bool,
185    pub predicate_sql: Option<String>,
186    pub predicate_expr: Option<Expr>,
187    pub collations: Vec<crate::types::Collation>,
188    pub kind: crate::types::IndexKind,
189    /// ANN-only: filter-column names from `WITH (filters = '...')`, resolved to
190    /// schema column indices in `build_index_def_for_create`. Empty otherwise.
191    pub ann_filter_cols: Vec<String>,
192    pub concurrently: bool,
193}
194
195#[derive(Debug, Clone)]
196pub struct CreateMatviewStmt {
197    pub name: String,
198    pub select_sql: String,
199    pub select_parsed: SelectQuery,
200    pub with_data: bool,
201    pub if_not_exists: bool,
202}
203
204#[derive(Debug, Clone)]
205pub struct RefreshMatviewStmt {
206    pub name: String,
207    pub concurrently: bool,
208}
209
210#[derive(Debug, Clone)]
211pub struct DropMatviewStmt {
212    pub name: String,
213    pub if_exists: bool,
214    pub cascade: bool,
215}
216
217#[derive(Debug, Clone, Copy, PartialEq, Eq)]
218pub enum TriggerTiming {
219    Before,
220    After,
221    InsteadOf,
222}
223
224#[derive(Debug, Clone, PartialEq, Eq)]
225pub enum TriggerEvent {
226    Insert,
227    Update(Vec<String>),
228    Delete,
229}
230
231#[derive(Debug, Clone, Copy, PartialEq, Eq)]
232pub enum TriggerGranularity {
233    ForEachRow,
234    ForEachStatement,
235}
236
237#[derive(Debug, Clone)]
238pub struct TransitionTables {
239    pub new_table_alias: Option<String>,
240    pub old_table_alias: Option<String>,
241}
242
243#[derive(Debug, Clone)]
244pub struct CreateTriggerStmt {
245    pub name: String,
246    pub timing: TriggerTiming,
247    pub events: Vec<TriggerEvent>,
248    pub target: String,
249    pub granularity: TriggerGranularity,
250    pub referencing: Option<TransitionTables>,
251    pub when_sql: Option<String>,
252    pub when_expr: Option<Expr>,
253    pub body_sql: String,
254    pub body: Vec<Statement>,
255    pub if_not_exists: bool,
256}
257
258#[derive(Debug, Clone)]
259pub struct DropTriggerStmt {
260    pub name: String,
261    pub table: Option<String>,
262    pub if_exists: bool,
263}
264
265#[derive(Debug, Clone)]
266pub struct DropIndexStmt {
267    pub index_name: String,
268    pub if_exists: bool,
269}
270
271#[derive(Debug, Clone)]
272pub struct CreateViewStmt {
273    pub name: String,
274    pub sql: String,
275    pub column_aliases: Vec<String>,
276    pub or_replace: bool,
277    pub if_not_exists: bool,
278}
279
280#[derive(Debug, Clone)]
281pub struct DropViewStmt {
282    pub name: String,
283    pub if_exists: bool,
284}
285
286#[derive(Debug, Clone)]
287pub enum InsertSource {
288    Values(Vec<Vec<Expr>>),
289    Select(Box<SelectQuery>),
290}
291
292#[derive(Debug, Clone)]
293pub struct InsertStmt {
294    pub table: String,
295    pub columns: Vec<String>,
296    pub source: InsertSource,
297    pub on_conflict: Option<OnConflictClause>,
298    pub returning: Option<Vec<SelectColumn>>,
299}
300
301#[derive(Debug, Clone)]
302pub struct OnConflictClause {
303    pub target: Option<ConflictTarget>,
304    pub action: OnConflictAction,
305}
306
307#[derive(Debug, Clone)]
308pub enum ConflictTarget {
309    Columns(Vec<String>),
310    Constraint(String),
311}
312
313#[derive(Debug, Clone)]
314pub enum OnConflictAction {
315    DoNothing,
316    DoUpdate {
317        assignments: Vec<(String, Expr)>,
318        where_clause: Option<Expr>,
319    },
320}
321
322#[derive(Debug, Clone)]
323pub struct TableRef {
324    pub name: String,
325    pub alias: Option<String>,
326    pub args: Option<Vec<Expr>>,
327}
328
329#[derive(Debug, Clone)]
330pub struct DerivedTable {
331    pub query: Box<SelectQuery>,
332    pub lateral: bool,
333    pub alias: String,
334}
335
336#[derive(Debug, Clone, Copy, PartialEq)]
337pub enum JoinType {
338    Inner,
339    Cross,
340    Left,
341    Right,
342    FullOuter,
343}
344
345#[derive(Debug, Clone)]
346pub struct JoinClause {
347    pub join_type: JoinType,
348    pub table: TableRef,
349    pub subquery: Option<Box<DerivedTable>>,
350    pub on_clause: Option<Expr>,
351}
352
353#[derive(Debug, Clone)]
354pub struct SelectStmt {
355    pub columns: Vec<SelectColumn>,
356    pub from: String,
357    pub from_alias: Option<String>,
358    pub from_subquery: Option<Box<DerivedTable>>,
359    pub from_args: Option<Vec<Expr>>,
360    pub from_json_table: Option<Box<JsonTableSpec>>,
361    pub joins: Vec<JoinClause>,
362    pub distinct: bool,
363    pub where_clause: Option<Expr>,
364    pub order_by: Vec<OrderByItem>,
365    pub limit: Option<Expr>,
366    pub offset: Option<Expr>,
367    pub group_by: Vec<Expr>,
368    pub having: Option<Expr>,
369}
370
371#[derive(Debug, Clone)]
372pub struct JsonTableSpec {
373    pub source: Expr,
374    pub root_path: String,
375    pub columns: Vec<JsonTableCol>,
376}
377
378#[derive(Debug, Clone)]
379pub enum JsonTableCol {
380    Named {
381        name: String,
382        ty: DataType,
383        path: String,
384        exists: bool,
385    },
386    Ordinality {
387        name: String,
388    },
389    Nested {
390        path: String,
391        columns: Vec<JsonTableCol>,
392    },
393}
394
395#[derive(Debug, Clone)]
396pub enum SetOp {
397    Union,
398    Intersect,
399    Except,
400}
401
402#[derive(Debug, Clone)]
403pub struct CompoundSelect {
404    pub op: SetOp,
405    pub all: bool,
406    pub left: Box<QueryBody>,
407    pub right: Box<QueryBody>,
408    pub order_by: Vec<OrderByItem>,
409    pub limit: Option<Expr>,
410    pub offset: Option<Expr>,
411}
412
413#[derive(Debug, Clone)]
414pub enum QueryBody {
415    Select(Box<SelectStmt>),
416    Compound(Box<CompoundSelect>),
417    Insert(Box<InsertStmt>),
418    Update(Box<UpdateStmt>),
419    Delete(Box<DeleteStmt>),
420}
421
422#[derive(Debug, Clone)]
423pub struct CteDefinition {
424    pub name: String,
425    pub column_aliases: Vec<String>,
426    pub body: QueryBody,
427}
428
429#[derive(Debug, Clone)]
430pub struct SelectQuery {
431    pub ctes: Vec<CteDefinition>,
432    pub recursive: bool,
433    pub body: QueryBody,
434}
435
436#[derive(Debug, Clone)]
437pub struct UpdateStmt {
438    pub table: String,
439    pub assignments: Vec<(String, Expr)>,
440    pub where_clause: Option<Expr>,
441    pub returning: Option<Vec<SelectColumn>>,
442}
443
444#[derive(Debug, Clone)]
445pub struct DeleteStmt {
446    pub table: String,
447    pub where_clause: Option<Expr>,
448    pub returning: Option<Vec<SelectColumn>>,
449}
450
451#[derive(Debug, Clone)]
452pub enum SelectColumn {
453    AllColumns,
454    AllFromOld,
455    AllFromNew,
456    Expr { expr: Expr, alias: Option<String> },
457}
458
459#[derive(Debug, Clone)]
460pub struct OrderByItem {
461    pub expr: Expr,
462    pub descending: bool,
463    pub nulls_first: Option<bool>,
464}
465
466#[derive(Debug, Clone)]
467pub enum Expr {
468    Literal(Value),
469    Column(String),
470    QualifiedColumn {
471        table: String,
472        column: String,
473    },
474    BinaryOp {
475        left: Box<Expr>,
476        op: BinOp,
477        right: Box<Expr>,
478    },
479    UnaryOp {
480        op: UnaryOp,
481        expr: Box<Expr>,
482    },
483    IsNull(Box<Expr>),
484    IsNotNull(Box<Expr>),
485    Function {
486        name: String,
487        args: Vec<Expr>,
488        /// True for aggregate forms like `COUNT(DISTINCT x)`.
489        distinct: bool,
490    },
491    CountStar,
492    InSubquery {
493        expr: Box<Expr>,
494        subquery: Box<SelectStmt>,
495        negated: bool,
496    },
497    InList {
498        expr: Box<Expr>,
499        list: Vec<Expr>,
500        negated: bool,
501    },
502    Exists {
503        subquery: Box<SelectStmt>,
504        negated: bool,
505    },
506    ScalarSubquery(Box<SelectStmt>),
507    InSet {
508        expr: Box<Expr>,
509        values: rustc_hash::FxHashSet<Value>,
510        has_null: bool,
511        negated: bool,
512    },
513    Between {
514        expr: Box<Expr>,
515        low: Box<Expr>,
516        high: Box<Expr>,
517        negated: bool,
518    },
519    Like {
520        expr: Box<Expr>,
521        pattern: Box<Expr>,
522        escape: Option<Box<Expr>>,
523        negated: bool,
524    },
525    Case {
526        operand: Option<Box<Expr>>,
527        conditions: Vec<(Expr, Expr)>,
528        else_result: Option<Box<Expr>>,
529    },
530    Coalesce(Vec<Expr>),
531    Cast {
532        expr: Box<Expr>,
533        data_type: DataType,
534    },
535    Parameter(usize),
536    WindowFunction {
537        name: String,
538        args: Vec<Expr>,
539        spec: WindowSpec,
540    },
541    Collate {
542        expr: Box<Expr>,
543        collation: crate::types::Collation,
544    },
545    TypedNullRecord(String),
546    ArrayLiteral(Vec<Expr>),
547    Quantified {
548        left: Box<Expr>,
549        op: BinOp,
550        quantifier: Quantifier,
551        right: QuantifiedRhs,
552    },
553}
554
555#[derive(Debug, Clone, Copy, PartialEq, Eq)]
556pub enum Quantifier {
557    Any,
558    All,
559}
560
561#[derive(Debug, Clone)]
562pub enum QuantifiedRhs {
563    Subquery(Box<SelectStmt>),
564    Array(Box<Expr>),
565}
566
567#[derive(Debug, Clone)]
568pub struct WindowSpec {
569    pub partition_by: Vec<Expr>,
570    pub order_by: Vec<OrderByItem>,
571    pub frame: Option<WindowFrame>,
572}
573
574#[derive(Debug, Clone)]
575pub struct WindowFrame {
576    pub units: WindowFrameUnits,
577    pub start: WindowFrameBound,
578    pub end: WindowFrameBound,
579}
580
581#[derive(Debug, Clone, Copy)]
582pub enum WindowFrameUnits {
583    Rows,
584    Range,
585    Groups,
586}
587
588#[derive(Debug, Clone)]
589pub enum WindowFrameBound {
590    UnboundedPreceding,
591    Preceding(Box<Expr>),
592    CurrentRow,
593    Following(Box<Expr>),
594    UnboundedFollowing,
595}
596
597#[derive(Debug, Clone, Copy, PartialEq, Eq)]
598pub enum BinOp {
599    Add,
600    Sub,
601    Mul,
602    Div,
603    Mod,
604    Eq,
605    NotEq,
606    Lt,
607    Gt,
608    LtEq,
609    GtEq,
610    And,
611    Or,
612    Concat,
613    JsonGet,
614    JsonGetText,
615    JsonPath,
616    JsonPathText,
617    JsonContains,
618    JsonContainedBy,
619    JsonHasKey,
620    JsonHasAnyKey,
621    JsonHasAllKeys,
622    JsonDeletePath,
623    JsonPathExists,
624    JsonPathMatch,
625    /// `@?_tz` — tz-aware variant of `@?`.
626    JsonPathExistsTz,
627    /// `@@_tz` — tz-aware variant of `@@`.
628    JsonPathMatchTz,
629    VectorL2,
630    VectorInner,
631    VectorCosine,
632}
633
634#[derive(Debug, Clone, Copy, PartialEq, Eq)]
635pub enum UnaryOp {
636    Neg,
637    Not,
638}
639
640pub fn has_subquery(expr: &Expr) -> bool {
641    match expr {
642        Expr::InSubquery { .. } | Expr::Exists { .. } | Expr::ScalarSubquery(_) => true,
643        Expr::BinaryOp { left, right, .. } => has_subquery(left) || has_subquery(right),
644        Expr::UnaryOp { expr, .. } => has_subquery(expr),
645        Expr::IsNull(e) | Expr::IsNotNull(e) => has_subquery(e),
646        Expr::InList { expr, list, .. } => has_subquery(expr) || list.iter().any(has_subquery),
647        Expr::InSet { expr, .. } => has_subquery(expr),
648        Expr::Between {
649            expr, low, high, ..
650        } => has_subquery(expr) || has_subquery(low) || has_subquery(high),
651        Expr::Like {
652            expr,
653            pattern,
654            escape,
655            ..
656        } => {
657            has_subquery(expr)
658                || has_subquery(pattern)
659                || escape.as_ref().is_some_and(|e| has_subquery(e))
660        }
661        Expr::Case {
662            operand,
663            conditions,
664            else_result,
665        } => {
666            operand.as_ref().is_some_and(|e| has_subquery(e))
667                || conditions
668                    .iter()
669                    .any(|(c, r)| has_subquery(c) || has_subquery(r))
670                || else_result.as_ref().is_some_and(|e| has_subquery(e))
671        }
672        Expr::Coalesce(args) | Expr::Function { args, .. } => args.iter().any(has_subquery),
673        Expr::Cast { expr, .. } => has_subquery(expr),
674        Expr::ArrayLiteral(elems) => elems.iter().any(has_subquery),
675        Expr::Quantified { left, right, .. } => {
676            has_subquery(left)
677                || match right {
678                    QuantifiedRhs::Subquery(_) => true,
679                    QuantifiedRhs::Array(e) => has_subquery(e),
680                }
681        }
682        _ => false,
683    }
684}
685
686pub fn parse_sql_expr(sql: &str) -> Result<Expr> {
687    let sp_expr = crate::dialect::parse_expr(sql).map_err(|e| SqlError::Parse(e.to_string()))?;
688    convert_expr(&sp_expr)
689}
690
691pub fn parse_sql(sql: &str) -> Result<Statement> {
692    if let Some(stmt) = try_parse_refresh_matview(sql) {
693        return stmt;
694    }
695    let (rewritten, no_data_flags) = strip_matview_with_no_data(sql);
696    let stmts =
697        crate::dialect::parse_statements(&rewritten).map_err(|e| SqlError::Parse(e.to_string()))?;
698
699    if stmts.is_empty() {
700        return Err(SqlError::Parse("empty SQL".into()));
701    }
702    if stmts.len() > 1 {
703        return Err(SqlError::Unsupported("multiple statements".into()));
704    }
705
706    let mut converted = convert_statement(stmts.into_iter().next().unwrap())?;
707    apply_no_data_flags(std::slice::from_mut(&mut converted), &no_data_flags);
708    Ok(converted)
709}
710
711pub fn parse_sql_multi(sql: &str) -> Result<Vec<Statement>> {
712    let (rewritten, no_data_flags) = strip_matview_with_no_data(sql);
713    let mut out: Vec<Statement> = Vec::new();
714    for (start, end) in split_statement_spans(&rewritten) {
715        let stmt_sql = &rewritten[start..end];
716        if let Some(parsed) = try_parse_refresh_matview(stmt_sql) {
717            out.push(parsed?);
718        } else {
719            let raw = crate::dialect::parse_statements(stmt_sql)
720                .map_err(|e| SqlError::Parse(e.to_string()))?;
721            for s in raw {
722                out.push(convert_statement(s)?);
723            }
724        }
725    }
726    if out.is_empty() {
727        return Err(SqlError::Parse("empty SQL".into()));
728    }
729    apply_no_data_flags(&mut out, &no_data_flags);
730    Ok(out)
731}
732
733fn split_statement_spans(sql: &str) -> Vec<(usize, usize)> {
734    let bytes = sql.as_bytes();
735    let mut spans: Vec<(usize, usize)> = Vec::new();
736    let mut stmt_start = 0usize;
737    let mut i = 0usize;
738    while i < bytes.len() {
739        let b = bytes[i];
740        if b == b'-' && i + 1 < bytes.len() && bytes[i + 1] == b'-' {
741            while i < bytes.len() && bytes[i] != b'\n' {
742                i += 1;
743            }
744            continue;
745        }
746        if b == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
747            i += 2;
748            while i + 1 < bytes.len() && !(bytes[i] == b'*' && bytes[i + 1] == b'/') {
749                i += 1;
750            }
751            if i + 1 < bytes.len() {
752                i += 2;
753            }
754            continue;
755        }
756        if b == b'\'' {
757            i += 1;
758            while i < bytes.len() {
759                if bytes[i] == b'\'' {
760                    if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
761                        i += 2;
762                    } else {
763                        i += 1;
764                        break;
765                    }
766                } else {
767                    i += 1;
768                }
769            }
770            continue;
771        }
772        if b == b'"' {
773            i += 1;
774            while i < bytes.len() {
775                if bytes[i] == b'"' {
776                    if i + 1 < bytes.len() && bytes[i + 1] == b'"' {
777                        i += 2;
778                    } else {
779                        i += 1;
780                        break;
781                    }
782                } else {
783                    i += 1;
784                }
785            }
786            continue;
787        }
788        if b == b'$' {
789            let mut j = i + 1;
790            while j < bytes.len() && (bytes[j].is_ascii_alphanumeric() || bytes[j] == b'_') {
791                j += 1;
792            }
793            if j < bytes.len() && bytes[j] == b'$' {
794                let tag_len = j - i + 1;
795                i = j + 1;
796                while i + tag_len <= bytes.len() {
797                    if bytes[i..i + tag_len] == bytes[(j - tag_len + 1)..=j] {
798                        i += tag_len;
799                        break;
800                    }
801                    i += 1;
802                }
803                continue;
804            }
805            i += 1;
806            continue;
807        }
808        if b == b';' {
809            if !sql[stmt_start..i].trim().is_empty() {
810                spans.push((stmt_start, i));
811            }
812            i += 1;
813            stmt_start = i;
814            continue;
815        }
816        i += 1;
817    }
818    if stmt_start < bytes.len() && !sql[stmt_start..].trim().is_empty() {
819        spans.push((stmt_start, bytes.len()));
820    }
821    spans
822}
823
824fn apply_no_data_flags(stmts: &mut [Statement], flags: &[bool]) {
825    let mut iter = flags.iter();
826    for stmt in stmts.iter_mut() {
827        if let Statement::CreateMaterializedView(boxed) = stmt {
828            if let Some(&no_data) = iter.next() {
829                boxed.with_data = !no_data;
830            }
831        }
832    }
833}
834
835/// sqlparser 0.61 doesn't natively parse REFRESH MATERIALIZED VIEW [CONCURRENTLY] <name>.
836fn try_parse_refresh_matview(sql: &str) -> Option<Result<Statement>> {
837    let trimmed = sql.trim().trim_end_matches(';').trim();
838    let lower = trimmed.to_ascii_lowercase();
839    let after = lower.strip_prefix("refresh materialized view")?;
840    let after = after.trim_start();
841    let (concurrently, rest_lower) = match after.strip_prefix("concurrently") {
842        Some(r) if r.starts_with(char::is_whitespace) => (true, r.trim_start()),
843        _ => (false, after),
844    };
845    if rest_lower.is_empty() {
846        return Some(Err(SqlError::Parse(
847            "REFRESH MATERIALIZED VIEW requires a name".into(),
848        )));
849    }
850    let name = rest_lower
851        .split_whitespace()
852        .next()
853        .unwrap_or("")
854        .trim_matches(|c: char| c == '"' || c == ';')
855        .to_string();
856    if name.is_empty() {
857        return Some(Err(SqlError::Parse(
858            "REFRESH MATERIALIZED VIEW requires a name".into(),
859        )));
860    }
861    Some(Ok(Statement::RefreshMaterializedView(RefreshMatviewStmt {
862        name,
863        concurrently,
864    })))
865}
866
867fn strip_matview_with_no_data(sql: &str) -> (String, Vec<bool>) {
868    let bytes = sql.as_bytes();
869    let mut out = String::with_capacity(sql.len());
870    let mut flags: Vec<bool> = Vec::new();
871    let mut stmt_start = 0usize;
872    let mut i = 0usize;
873
874    let push_one = |out: &mut String, i: &mut usize, bytes: &[u8], sql: &str| {
875        let b = bytes[*i];
876        if b < 0x80 {
877            out.push(b as char);
878            *i += 1;
879        } else {
880            let len = if b >= 0xF0 {
881                4
882            } else if b >= 0xE0 {
883                3
884            } else if b >= 0xC0 {
885                2
886            } else {
887                1
888            };
889            let end = (*i + len).min(bytes.len());
890            out.push_str(&sql[*i..end]);
891            *i = end;
892        }
893    };
894
895    while i < bytes.len() {
896        let b = bytes[i];
897        if b == b'-' && i + 1 < bytes.len() && bytes[i + 1] == b'-' {
898            while i < bytes.len() && bytes[i] != b'\n' {
899                push_one(&mut out, &mut i, bytes, sql);
900            }
901            continue;
902        }
903        if b == b'/' && i + 1 < bytes.len() && bytes[i + 1] == b'*' {
904            out.push('/');
905            out.push('*');
906            i += 2;
907            while i + 1 < bytes.len() && !(bytes[i] == b'*' && bytes[i + 1] == b'/') {
908                push_one(&mut out, &mut i, bytes, sql);
909            }
910            if i + 1 < bytes.len() {
911                out.push('*');
912                out.push('/');
913                i += 2;
914            }
915            continue;
916        }
917        if b == b'\'' {
918            out.push('\'');
919            i += 1;
920            while i < bytes.len() {
921                if bytes[i] == b'\'' {
922                    if i + 1 < bytes.len() && bytes[i + 1] == b'\'' {
923                        out.push('\'');
924                        out.push('\'');
925                        i += 2;
926                    } else {
927                        out.push('\'');
928                        i += 1;
929                        break;
930                    }
931                } else {
932                    push_one(&mut out, &mut i, bytes, sql);
933                }
934            }
935            continue;
936        }
937        if b == b'"' {
938            out.push('"');
939            i += 1;
940            while i < bytes.len() {
941                if bytes[i] == b'"' {
942                    if i + 1 < bytes.len() && bytes[i + 1] == b'"' {
943                        out.push('"');
944                        out.push('"');
945                        i += 2;
946                    } else {
947                        out.push('"');
948                        i += 1;
949                        break;
950                    }
951                } else {
952                    push_one(&mut out, &mut i, bytes, sql);
953                }
954            }
955            continue;
956        }
957        if b == b'$' {
958            let mut j = i + 1;
959            while j < bytes.len() && (bytes[j].is_ascii_alphanumeric() || bytes[j] == b'_') {
960                j += 1;
961            }
962            if j < bytes.len() && bytes[j] == b'$' {
963                let tag_len = j - i + 1;
964                out.push_str(&sql[i..=j]);
965                i = j + 1;
966                while i + tag_len <= bytes.len() {
967                    if bytes[i..i + tag_len] == bytes[(j - tag_len + 1)..=j] {
968                        out.push_str(&sql[i..i + tag_len]);
969                        i += tag_len;
970                        break;
971                    }
972                    push_one(&mut out, &mut i, bytes, sql);
973                }
974                continue;
975            }
976            out.push('$');
977            i += 1;
978            continue;
979        }
980        if b == b';' {
981            let stmt_text = &out[stmt_start..];
982            if statement_is_create_matview(stmt_text) {
983                match find_with_no_data_suffix(stmt_text) {
984                    Some(truncate_to) => {
985                        out.truncate(stmt_start + truncate_to);
986                        flags.push(true);
987                    }
988                    None => flags.push(false),
989                }
990            }
991            out.push(';');
992            i += 1;
993            while i < bytes.len() && bytes[i].is_ascii_whitespace() {
994                out.push(bytes[i] as char);
995                i += 1;
996            }
997            stmt_start = out.len();
998            continue;
999        }
1000        push_one(&mut out, &mut i, bytes, sql);
1001    }
1002
1003    let stmt_text = &out[stmt_start..];
1004    if !stmt_text.trim().is_empty() && statement_is_create_matview(stmt_text) {
1005        match find_with_no_data_suffix(stmt_text) {
1006            Some(truncate_to) => {
1007                out.truncate(stmt_start + truncate_to);
1008                flags.push(true);
1009            }
1010            None => flags.push(false),
1011        }
1012    }
1013
1014    (out, flags)
1015}
1016
1017fn statement_is_create_matview(stmt: &str) -> bool {
1018    let stripped = strip_leading_ws_and_comments(stmt);
1019    let lower = stripped.to_ascii_lowercase();
1020    let s = lower.as_str();
1021    let after_create = match strip_kw_lower(s, "create") {
1022        Some(r) => r,
1023        None => return false,
1024    };
1025    let after_orrep = if let Some(r) = strip_kw_lower(after_create, "or") {
1026        if let Some(r2) = strip_kw_lower(r, "replace") {
1027            r2
1028        } else {
1029            after_create
1030        }
1031    } else {
1032        after_create
1033    };
1034    let after_temp = if let Some(r) = strip_kw_lower(after_orrep, "temporary") {
1035        r
1036    } else if let Some(r) = strip_kw_lower(after_orrep, "temp") {
1037        r
1038    } else {
1039        after_orrep
1040    };
1041    let after_mv = match strip_kw_lower(after_temp, "materialized") {
1042        Some(r) => r,
1043        None => return false,
1044    };
1045    strip_kw_lower(after_mv, "view").is_some()
1046}
1047
1048fn strip_leading_ws_and_comments(s: &str) -> &str {
1049    let mut rest = s;
1050    loop {
1051        let trimmed = rest.trim_start();
1052        if let Some(after) = trimmed.strip_prefix("--") {
1053            if let Some(nl) = after.find('\n') {
1054                rest = &after[nl..];
1055                continue;
1056            }
1057            return "";
1058        }
1059        if let Some(after) = trimmed.strip_prefix("/*") {
1060            if let Some(end) = after.find("*/") {
1061                rest = &after[end + 2..];
1062                continue;
1063            }
1064            return "";
1065        }
1066        return trimmed;
1067    }
1068}
1069
1070fn strip_kw_lower<'a>(s: &'a str, kw: &str) -> Option<&'a str> {
1071    let rest = s.strip_prefix(kw)?;
1072    if rest.is_empty() || rest.starts_with(char::is_whitespace) {
1073        Some(rest.trim_start())
1074    } else {
1075        None
1076    }
1077}
1078
1079fn find_with_no_data_suffix(stmt: &str) -> Option<usize> {
1080    let bytes = stmt.as_bytes();
1081    let mut end = bytes.len();
1082    while end > 0 && bytes[end - 1].is_ascii_whitespace() {
1083        end -= 1;
1084    }
1085    if end < 4 || !bytes[end - 4..end].eq_ignore_ascii_case(b"data") {
1086        return None;
1087    }
1088    let mut p = end - 4;
1089    let ws_start = p;
1090    while p > 0 && bytes[p - 1].is_ascii_whitespace() {
1091        p -= 1;
1092    }
1093    if p == ws_start {
1094        return None;
1095    }
1096    if p < 2 || !bytes[p - 2..p].eq_ignore_ascii_case(b"no") {
1097        return None;
1098    }
1099    p -= 2;
1100    let ws_start = p;
1101    while p > 0 && bytes[p - 1].is_ascii_whitespace() {
1102        p -= 1;
1103    }
1104    if p == ws_start {
1105        return None;
1106    }
1107    if p < 4 || !bytes[p - 4..p].eq_ignore_ascii_case(b"with") {
1108        return None;
1109    }
1110    p -= 4;
1111    let ws_start = p;
1112    while p > 0 && bytes[p - 1].is_ascii_whitespace() {
1113        p -= 1;
1114    }
1115    if p == ws_start {
1116        return None;
1117    }
1118    Some(p)
1119}
1120
1121pub fn count_params(stmt: &Statement) -> usize {
1122    let mut max_idx = 0usize;
1123    visit_exprs_stmt(stmt, &mut |e| {
1124        if let Expr::Parameter(n) = e {
1125            max_idx = max_idx.max(*n);
1126        }
1127    });
1128    max_idx
1129}
1130
1131fn visit_exprs_stmt(stmt: &Statement, visitor: &mut impl FnMut(&Expr)) {
1132    match stmt {
1133        Statement::Select(sq) => {
1134            for cte in &sq.ctes {
1135                visit_exprs_query_body(&cte.body, visitor);
1136            }
1137            visit_exprs_query_body(&sq.body, visitor);
1138        }
1139        Statement::Insert(ins) => match &ins.source {
1140            InsertSource::Values(rows) => {
1141                for row in rows {
1142                    for e in row {
1143                        visit_expr(e, visitor);
1144                    }
1145                }
1146            }
1147            InsertSource::Select(sq) => {
1148                for cte in &sq.ctes {
1149                    visit_exprs_query_body(&cte.body, visitor);
1150                }
1151                visit_exprs_query_body(&sq.body, visitor);
1152            }
1153        },
1154        Statement::Update(upd) => {
1155            for (_, e) in &upd.assignments {
1156                visit_expr(e, visitor);
1157            }
1158            if let Some(w) = &upd.where_clause {
1159                visit_expr(w, visitor);
1160            }
1161        }
1162        Statement::Delete(del) => {
1163            if let Some(w) = &del.where_clause {
1164                visit_expr(w, visitor);
1165            }
1166        }
1167        Statement::Explain(inner) => visit_exprs_stmt(inner, visitor),
1168        _ => {}
1169    }
1170}
1171
1172fn visit_exprs_query_body(body: &QueryBody, visitor: &mut impl FnMut(&Expr)) {
1173    match body {
1174        QueryBody::Select(sel) => visit_exprs_select(sel, visitor),
1175        QueryBody::Compound(comp) => {
1176            visit_exprs_query_body(&comp.left, visitor);
1177            visit_exprs_query_body(&comp.right, visitor);
1178            for o in &comp.order_by {
1179                visit_expr(&o.expr, visitor);
1180            }
1181            if let Some(l) = &comp.limit {
1182                visit_expr(l, visitor);
1183            }
1184            if let Some(o) = &comp.offset {
1185                visit_expr(o, visitor);
1186            }
1187        }
1188        QueryBody::Insert(ins) => visit_exprs_stmt(&Statement::Insert((**ins).clone()), visitor),
1189        QueryBody::Update(upd) => visit_exprs_stmt(&Statement::Update((**upd).clone()), visitor),
1190        QueryBody::Delete(del) => visit_exprs_stmt(&Statement::Delete((**del).clone()), visitor),
1191    }
1192}
1193
1194fn visit_exprs_select(sel: &SelectStmt, visitor: &mut impl FnMut(&Expr)) {
1195    for col in &sel.columns {
1196        if let SelectColumn::Expr { expr, .. } = col {
1197            visit_expr(expr, visitor);
1198        }
1199    }
1200    for j in &sel.joins {
1201        if let Some(on) = &j.on_clause {
1202            visit_expr(on, visitor);
1203        }
1204    }
1205    if let Some(w) = &sel.where_clause {
1206        visit_expr(w, visitor);
1207    }
1208    for o in &sel.order_by {
1209        visit_expr(&o.expr, visitor);
1210    }
1211    if let Some(l) = &sel.limit {
1212        visit_expr(l, visitor);
1213    }
1214    if let Some(o) = &sel.offset {
1215        visit_expr(o, visitor);
1216    }
1217    for g in &sel.group_by {
1218        visit_expr(g, visitor);
1219    }
1220    if let Some(h) = &sel.having {
1221        visit_expr(h, visitor);
1222    }
1223}
1224
1225fn visit_expr(expr: &Expr, visitor: &mut impl FnMut(&Expr)) {
1226    visitor(expr);
1227    match expr {
1228        Expr::BinaryOp { left, right, .. } => {
1229            visit_expr(left, visitor);
1230            visit_expr(right, visitor);
1231        }
1232        Expr::UnaryOp { expr: e, .. } | Expr::IsNull(e) | Expr::IsNotNull(e) => {
1233            visit_expr(e, visitor);
1234        }
1235        Expr::Function { args, .. } | Expr::Coalesce(args) => {
1236            for a in args {
1237                visit_expr(a, visitor);
1238            }
1239        }
1240        Expr::InSubquery {
1241            expr: e, subquery, ..
1242        } => {
1243            visit_expr(e, visitor);
1244            visit_exprs_select(subquery, visitor);
1245        }
1246        Expr::InList { expr: e, list, .. } => {
1247            visit_expr(e, visitor);
1248            for l in list {
1249                visit_expr(l, visitor);
1250            }
1251        }
1252        Expr::Exists { subquery, .. } => visit_exprs_select(subquery, visitor),
1253        Expr::ScalarSubquery(sq) => visit_exprs_select(sq, visitor),
1254        Expr::InSet { expr: e, .. } => visit_expr(e, visitor),
1255        Expr::Between {
1256            expr: e, low, high, ..
1257        } => {
1258            visit_expr(e, visitor);
1259            visit_expr(low, visitor);
1260            visit_expr(high, visitor);
1261        }
1262        Expr::Like {
1263            expr: e,
1264            pattern,
1265            escape,
1266            ..
1267        } => {
1268            visit_expr(e, visitor);
1269            visit_expr(pattern, visitor);
1270            if let Some(esc) = escape {
1271                visit_expr(esc, visitor);
1272            }
1273        }
1274        Expr::Case {
1275            operand,
1276            conditions,
1277            else_result,
1278        } => {
1279            if let Some(op) = operand {
1280                visit_expr(op, visitor);
1281            }
1282            for (cond, then) in conditions {
1283                visit_expr(cond, visitor);
1284                visit_expr(then, visitor);
1285            }
1286            if let Some(el) = else_result {
1287                visit_expr(el, visitor);
1288            }
1289        }
1290        Expr::Cast { expr: e, .. } => visit_expr(e, visitor),
1291        Expr::Collate { expr: e, .. } => visit_expr(e, visitor),
1292        Expr::WindowFunction { args, spec, .. } => {
1293            for a in args {
1294                visit_expr(a, visitor);
1295            }
1296            for p in &spec.partition_by {
1297                visit_expr(p, visitor);
1298            }
1299            for o in &spec.order_by {
1300                visit_expr(&o.expr, visitor);
1301            }
1302            if let Some(ref frame) = spec.frame {
1303                if let WindowFrameBound::Preceding(e) | WindowFrameBound::Following(e) =
1304                    &frame.start
1305                {
1306                    visit_expr(e, visitor);
1307                }
1308                if let WindowFrameBound::Preceding(e) | WindowFrameBound::Following(e) = &frame.end
1309                {
1310                    visit_expr(e, visitor);
1311                }
1312            }
1313        }
1314        Expr::ArrayLiteral(elems) => {
1315            for e in elems {
1316                visit_expr(e, visitor);
1317            }
1318        }
1319        Expr::Quantified { left, right, .. } => {
1320            visit_expr(left, visitor);
1321            match right {
1322                QuantifiedRhs::Subquery(sq) => visit_exprs_select(sq, visitor),
1323                QuantifiedRhs::Array(e) => visit_expr(e, visitor),
1324            }
1325        }
1326        Expr::Literal(_)
1327        | Expr::Column(_)
1328        | Expr::QualifiedColumn { .. }
1329        | Expr::CountStar
1330        | Expr::Parameter(_)
1331        | Expr::TypedNullRecord(_) => {}
1332    }
1333}
1334
1335fn convert_statement(stmt: sp::Statement) -> Result<Statement> {
1336    match stmt {
1337        sp::Statement::CreateTable(ct) => convert_create_table(ct),
1338        sp::Statement::CreateIndex(ci) => convert_create_index(ci),
1339        sp::Statement::Drop {
1340            object_type: sp::ObjectType::Table,
1341            if_exists,
1342            names,
1343            ..
1344        } => {
1345            if names.len() != 1 {
1346                return Err(SqlError::Unsupported("multi-table DROP".into()));
1347            }
1348            Ok(Statement::DropTable(DropTableStmt {
1349                name: object_name_to_string(&names[0]),
1350                if_exists,
1351            }))
1352        }
1353        sp::Statement::Drop {
1354            object_type: sp::ObjectType::Index,
1355            if_exists,
1356            names,
1357            ..
1358        } => {
1359            if names.len() != 1 {
1360                return Err(SqlError::Unsupported("multi-index DROP".into()));
1361            }
1362            Ok(Statement::DropIndex(DropIndexStmt {
1363                index_name: object_name_to_string(&names[0]),
1364                if_exists,
1365            }))
1366        }
1367        sp::Statement::CreateView(cv) => convert_create_view(cv),
1368        sp::Statement::Drop {
1369            object_type: sp::ObjectType::View,
1370            if_exists,
1371            names,
1372            ..
1373        } => {
1374            if names.len() != 1 {
1375                return Err(SqlError::Unsupported("multi-view DROP".into()));
1376            }
1377            Ok(Statement::DropView(DropViewStmt {
1378                name: object_name_to_string(&names[0]),
1379                if_exists,
1380            }))
1381        }
1382        sp::Statement::Drop {
1383            object_type: sp::ObjectType::MaterializedView,
1384            if_exists,
1385            names,
1386            cascade,
1387            ..
1388        } => {
1389            if names.len() != 1 {
1390                return Err(SqlError::Unsupported("multi-matview DROP".into()));
1391            }
1392            Ok(Statement::DropMaterializedView(DropMatviewStmt {
1393                name: object_name_to_string(&names[0]),
1394                if_exists,
1395                cascade,
1396            }))
1397        }
1398        sp::Statement::CreateTrigger(ct) => convert_create_trigger(ct),
1399        sp::Statement::DropTrigger(dt) => Ok(Statement::DropTrigger(DropTriggerStmt {
1400            name: object_name_to_string(&dt.trigger_name),
1401            table: dt.table_name.as_ref().map(object_name_to_string),
1402            if_exists: dt.if_exists,
1403        })),
1404        sp::Statement::AlterTable(at) => convert_alter_table(at),
1405        sp::Statement::Insert(insert) => convert_insert(insert),
1406        sp::Statement::Query(query) => convert_query(*query),
1407        sp::Statement::Update(update) => convert_update(update),
1408        sp::Statement::Delete(delete) => convert_delete(delete),
1409        sp::Statement::Truncate(t) => convert_truncate(t),
1410        sp::Statement::StartTransaction { modes, .. } => {
1411            let mut access_mode = BeginAccessMode::Default;
1412            for mode in modes {
1413                if let sp::TransactionMode::AccessMode(am) = mode {
1414                    access_mode = match am {
1415                        sp::TransactionAccessMode::ReadOnly => BeginAccessMode::ReadOnly,
1416                        sp::TransactionAccessMode::ReadWrite => BeginAccessMode::ReadWrite,
1417                    };
1418                }
1419            }
1420            Ok(Statement::Begin { access_mode })
1421        }
1422        sp::Statement::Commit { chain: true, .. } => {
1423            Err(SqlError::Unsupported("COMMIT AND CHAIN".into()))
1424        }
1425        sp::Statement::Commit { .. } => Ok(Statement::Commit),
1426        sp::Statement::Rollback { chain: true, .. } => {
1427            Err(SqlError::Unsupported("ROLLBACK AND CHAIN".into()))
1428        }
1429        sp::Statement::Rollback {
1430            savepoint: Some(name),
1431            ..
1432        } => Ok(Statement::RollbackTo(name.value.to_ascii_lowercase())),
1433        sp::Statement::Rollback { .. } => Ok(Statement::Rollback),
1434        sp::Statement::Savepoint { name } => {
1435            Ok(Statement::Savepoint(name.value.to_ascii_lowercase()))
1436        }
1437        sp::Statement::ReleaseSavepoint { name } => {
1438            Ok(Statement::ReleaseSavepoint(name.value.to_ascii_lowercase()))
1439        }
1440        sp::Statement::Set(sp::Set::SetTimeZone { value, .. }) => {
1441            // Accept a string literal or bare identifier (PG allows `SET TIME ZONE UTC`).
1442            let zone = match value {
1443                sp::Expr::Value(v) => match &v.value {
1444                    sp::Value::SingleQuotedString(s) => s.clone(),
1445                    sp::Value::DoubleQuotedString(s) => s.clone(),
1446                    other => other.to_string(),
1447                },
1448                sp::Expr::Identifier(ident) => ident.value.clone(),
1449                other => {
1450                    return Err(SqlError::Parse(format!(
1451                        "SET TIME ZONE expects a string literal or identifier, got: {other}"
1452                    )))
1453                }
1454            };
1455            Ok(Statement::SetTimezone(zone))
1456        }
1457        sp::Statement::Explain {
1458            statement, analyze, ..
1459        } => {
1460            if analyze {
1461                return Err(SqlError::Unsupported("EXPLAIN ANALYZE".into()));
1462            }
1463            let inner = convert_statement(*statement)?;
1464            Ok(Statement::Explain(Box::new(inner)))
1465        }
1466        _ => Err(SqlError::Unsupported(format!("statement type: {}", stmt))),
1467    }
1468}
1469
1470/// Returns (ColumnSpec, Option<ForeignKeyDef>, was_inline_pk, was_unique).
1471fn convert_column_def(
1472    col_def: &sp::ColumnDef,
1473) -> Result<(ColumnSpec, Option<ForeignKeyDef>, bool, bool)> {
1474    let col_name = col_def.name.value.clone();
1475    let data_type = convert_data_type(&col_def.data_type)?;
1476    let mut nullable = true;
1477    let mut is_primary_key = false;
1478    let mut is_unique = false;
1479    let mut default_expr = None;
1480    let mut default_sql = None;
1481    let mut check_expr = None;
1482    let mut check_sql = None;
1483    let mut check_name = None;
1484    let mut generated_expr = None;
1485    let mut generated_sql = None;
1486    let mut generated_kind = None;
1487    let mut fk_def = None;
1488    let mut collation = crate::types::Collation::Binary;
1489
1490    for opt in &col_def.options {
1491        match &opt.option {
1492            sp::ColumnOption::Collation(name) => {
1493                let coll_name = object_name_to_string(name);
1494                collation = crate::types::Collation::from_name(&coll_name).ok_or_else(|| {
1495                    SqlError::Unsupported(format!(
1496                        "collation '{coll_name}' not supported (BINARY/NOCASE/RTRIM only)"
1497                    ))
1498                })?;
1499            }
1500            sp::ColumnOption::NotNull => nullable = false,
1501            sp::ColumnOption::Null => nullable = true,
1502            sp::ColumnOption::PrimaryKey(_) => {
1503                is_primary_key = true;
1504                nullable = false;
1505            }
1506            sp::ColumnOption::Unique(_) => is_unique = true,
1507            sp::ColumnOption::Default(expr) => {
1508                default_sql = Some(expr.to_string());
1509                default_expr = Some(convert_expr(expr)?);
1510            }
1511            sp::ColumnOption::Check(check) => {
1512                check_sql = Some(check.expr.to_string());
1513                let converted = convert_expr(&check.expr)?;
1514                if has_subquery(&converted) {
1515                    return Err(SqlError::Unsupported("subquery in CHECK constraint".into()));
1516                }
1517                check_expr = Some(converted);
1518                check_name = check.name.as_ref().map(|n| n.value.clone());
1519            }
1520            sp::ColumnOption::ForeignKey(fk) => {
1521                let (on_delete, on_update) = convert_fk_actions(&fk.on_delete, &fk.on_update)?;
1522                let (deferrable, initially_deferred) =
1523                    convert_fk_characteristics(&fk.characteristics);
1524                let ftable = object_name_to_string(&fk.foreign_table).to_ascii_lowercase();
1525                let referred: Vec<String> = fk
1526                    .referred_columns
1527                    .iter()
1528                    .map(|i| i.value.to_ascii_lowercase())
1529                    .collect();
1530                fk_def = Some(ForeignKeyDef {
1531                    name: fk.name.as_ref().map(|n| n.value.clone()),
1532                    columns: vec![col_name.to_ascii_lowercase()],
1533                    foreign_table: ftable,
1534                    referred_columns: referred,
1535                    on_delete,
1536                    on_update,
1537                    deferrable,
1538                    initially_deferred,
1539                });
1540            }
1541            sp::ColumnOption::Generated {
1542                generation_expr,
1543                generation_expr_mode,
1544                sequence_options: _,
1545                ..
1546            } => {
1547                let Some(expr) = generation_expr else {
1548                    return Err(SqlError::Unsupported(
1549                        "identity columns not yet supported; use INTEGER PRIMARY KEY for autoincrement".into(),
1550                    ));
1551                };
1552                let mode = generation_expr_mode.unwrap_or(sp::GeneratedExpressionMode::Virtual);
1553                let converted = convert_expr(expr)?;
1554                reject_aggregate_or_window(expr, "GENERATED")?;
1555                if has_subquery(&converted) {
1556                    return Err(SqlError::Unsupported(
1557                        "subquery in GENERATED expression".into(),
1558                    ));
1559                }
1560                reject_volatile_in_generated(&converted)?;
1561                generated_sql = Some(expr.to_string());
1562                generated_expr = Some(converted);
1563                generated_kind = Some(match mode {
1564                    sp::GeneratedExpressionMode::Stored => GeneratedKind::Stored,
1565                    sp::GeneratedExpressionMode::Virtual => GeneratedKind::Virtual,
1566                });
1567            }
1568            _ => {}
1569        }
1570    }
1571
1572    if generated_kind.is_some() {
1573        if default_expr.is_some() {
1574            return Err(SqlError::Unsupported(
1575                "DEFAULT and GENERATED cannot be combined".into(),
1576            ));
1577        }
1578        if is_primary_key {
1579            return Err(SqlError::Unsupported(
1580                "GENERATED column cannot be PRIMARY KEY".into(),
1581            ));
1582        }
1583    }
1584
1585    let spec = ColumnSpec {
1586        name: col_name,
1587        data_type,
1588        nullable,
1589        is_primary_key,
1590        default_expr,
1591        default_sql,
1592        check_expr,
1593        check_sql,
1594        check_name,
1595        generated_expr,
1596        generated_sql,
1597        generated_kind,
1598        collation,
1599    };
1600    Ok((spec, fk_def, is_primary_key, is_unique))
1601}
1602
1603fn reject_volatile_in_generated(expr: &Expr) -> Result<()> {
1604    fn walk(e: &Expr) -> Result<()> {
1605        match e {
1606            Expr::Function { name, args, .. } => {
1607                let upper = name.to_ascii_uppercase();
1608                if crate::eval::is_volatile_function_expr(&upper, args) {
1609                    return Err(SqlError::Unsupported(format!(
1610                        "volatile function {name}() not allowed in GENERATED expression"
1611                    )));
1612                }
1613                for a in args {
1614                    walk(a)?;
1615                }
1616                Ok(())
1617            }
1618            Expr::BinaryOp { left, right, .. } => {
1619                walk(left)?;
1620                walk(right)
1621            }
1622            Expr::UnaryOp { expr, .. } => walk(expr),
1623            Expr::Cast { expr, .. } => walk(expr),
1624            Expr::Case {
1625                operand,
1626                conditions,
1627                else_result,
1628            } => {
1629                if let Some(o) = operand {
1630                    walk(o)?;
1631                }
1632                for (cond, res) in conditions {
1633                    walk(cond)?;
1634                    walk(res)?;
1635                }
1636                if let Some(e) = else_result {
1637                    walk(e)?;
1638                }
1639                Ok(())
1640            }
1641            Expr::Coalesce(items) => items.iter().try_for_each(walk),
1642            _ => Ok(()),
1643        }
1644    }
1645    walk(expr)
1646}
1647
1648fn convert_create_table(ct: sp::CreateTable) -> Result<Statement> {
1649    let name = object_name_to_string(&ct.name);
1650    let if_not_exists = ct.if_not_exists;
1651    let strict = ct.strict;
1652    let temporary = ct.temporary;
1653
1654    let mut columns = Vec::new();
1655    let mut inline_pk: Vec<String> = Vec::new();
1656    let mut foreign_keys: Vec<ForeignKeyDef> = Vec::new();
1657    let mut unique_indices: Vec<UniqueIndexDef> = Vec::new();
1658
1659    for col_def in &ct.columns {
1660        let (spec, fk_def, was_pk, was_unique) = convert_column_def(col_def)?;
1661        if was_pk {
1662            inline_pk.push(spec.name.clone());
1663        }
1664        if let Some(fk) = fk_def {
1665            foreign_keys.push(fk);
1666        }
1667        if was_unique && !was_pk {
1668            unique_indices.push(UniqueIndexDef {
1669                name: None,
1670                columns: vec![spec.name.to_ascii_lowercase()],
1671            });
1672        }
1673        columns.push(spec);
1674    }
1675
1676    let mut check_constraints: Vec<TableCheckConstraint> = Vec::new();
1677
1678    for constraint in &ct.constraints {
1679        match constraint {
1680            sp::TableConstraint::PrimaryKey(pk_constraint) => {
1681                for idx_col in &pk_constraint.columns {
1682                    let col_name = match &idx_col.column.expr {
1683                        sp::Expr::Identifier(ident) => ident.value.clone(),
1684                        _ => continue,
1685                    };
1686                    if !inline_pk.contains(&col_name) {
1687                        inline_pk.push(col_name.clone());
1688                    }
1689                    if let Some(col) = columns.iter_mut().find(|c| c.name == col_name) {
1690                        col.nullable = false;
1691                        col.is_primary_key = true;
1692                    }
1693                }
1694            }
1695            sp::TableConstraint::Check(check) => {
1696                let sql = check.expr.to_string();
1697                let converted = convert_expr(&check.expr)?;
1698                if has_subquery(&converted) {
1699                    return Err(SqlError::Unsupported("subquery in CHECK constraint".into()));
1700                }
1701                check_constraints.push(TableCheckConstraint {
1702                    name: check.name.as_ref().map(|n| n.value.clone()),
1703                    expr: converted,
1704                    sql,
1705                });
1706            }
1707            sp::TableConstraint::ForeignKey(fk) => {
1708                let (on_delete, on_update) = convert_fk_actions(&fk.on_delete, &fk.on_update)?;
1709                let (deferrable, initially_deferred) =
1710                    convert_fk_characteristics(&fk.characteristics);
1711                let cols: Vec<String> = fk
1712                    .columns
1713                    .iter()
1714                    .map(|i| i.value.to_ascii_lowercase())
1715                    .collect();
1716                let ftable = object_name_to_string(&fk.foreign_table).to_ascii_lowercase();
1717                let referred: Vec<String> = fk
1718                    .referred_columns
1719                    .iter()
1720                    .map(|i| i.value.to_ascii_lowercase())
1721                    .collect();
1722                foreign_keys.push(ForeignKeyDef {
1723                    name: fk.name.as_ref().map(|n| n.value.clone()),
1724                    columns: cols,
1725                    foreign_table: ftable,
1726                    referred_columns: referred,
1727                    on_delete,
1728                    on_update,
1729                    deferrable,
1730                    initially_deferred,
1731                });
1732            }
1733            sp::TableConstraint::Unique(u) => {
1734                let cols: Vec<String> = u
1735                    .columns
1736                    .iter()
1737                    .filter_map(|idx_col| match &idx_col.column.expr {
1738                        sp::Expr::Identifier(ident) => Some(ident.value.to_ascii_lowercase()),
1739                        _ => None,
1740                    })
1741                    .collect();
1742                if !cols.is_empty() {
1743                    unique_indices.push(UniqueIndexDef {
1744                        name: u.name.as_ref().map(|n| n.value.clone()),
1745                        columns: cols,
1746                    });
1747                }
1748            }
1749            _ => {}
1750        }
1751    }
1752
1753    Ok(Statement::CreateTable(CreateTableStmt {
1754        name,
1755        columns,
1756        primary_key: inline_pk,
1757        if_not_exists,
1758        check_constraints,
1759        foreign_keys,
1760        unique_indices,
1761        strict,
1762        temporary,
1763    }))
1764}
1765
1766fn convert_alter_table(at: sp::AlterTable) -> Result<Statement> {
1767    let table = object_name_to_string(&at.name);
1768    if at.operations.len() != 1 {
1769        return Err(SqlError::Unsupported(
1770            "ALTER TABLE with multiple operations".into(),
1771        ));
1772    }
1773    let op = match at.operations.into_iter().next().unwrap() {
1774        sp::AlterTableOperation::AddColumn {
1775            column_def,
1776            if_not_exists,
1777            ..
1778        } => {
1779            let (spec, fk, _was_pk, _was_unique) = convert_column_def(&column_def)?;
1780            AlterTableOp::AddColumn {
1781                column: Box::new(spec),
1782                foreign_key: fk,
1783                if_not_exists,
1784            }
1785        }
1786        sp::AlterTableOperation::DropColumn {
1787            column_names,
1788            if_exists,
1789            ..
1790        } => {
1791            if column_names.len() != 1 {
1792                return Err(SqlError::Unsupported(
1793                    "DROP COLUMN with multiple columns".into(),
1794                ));
1795            }
1796            AlterTableOp::DropColumn {
1797                name: column_names.into_iter().next().unwrap().value,
1798                if_exists,
1799            }
1800        }
1801        sp::AlterTableOperation::RenameColumn {
1802            old_column_name,
1803            new_column_name,
1804        } => AlterTableOp::RenameColumn {
1805            old_name: old_column_name.value,
1806            new_name: new_column_name.value,
1807        },
1808        sp::AlterTableOperation::RenameTable { table_name } => {
1809            let new_name = match table_name {
1810                sp::RenameTableNameKind::To(name) | sp::RenameTableNameKind::As(name) => {
1811                    object_name_to_string(&name)
1812                }
1813            };
1814            AlterTableOp::RenameTable { new_name }
1815        }
1816        sp::AlterTableOperation::DisableTrigger { name } => {
1817            if name.value.eq_ignore_ascii_case("all") {
1818                AlterTableOp::DisableAllTriggers
1819            } else {
1820                AlterTableOp::DisableTrigger {
1821                    name: name.value.to_ascii_lowercase(),
1822                }
1823            }
1824        }
1825        sp::AlterTableOperation::EnableTrigger { name } => {
1826            if name.value.eq_ignore_ascii_case("all") {
1827                AlterTableOp::EnableAllTriggers
1828            } else {
1829                AlterTableOp::EnableTrigger {
1830                    name: name.value.to_ascii_lowercase(),
1831                }
1832            }
1833        }
1834        other => {
1835            return Err(SqlError::Unsupported(format!(
1836                "ALTER TABLE operation: {other}"
1837            )));
1838        }
1839    };
1840    Ok(Statement::AlterTable(Box::new(AlterTableStmt {
1841        table,
1842        op,
1843    })))
1844}
1845
1846fn convert_fk_actions(
1847    on_delete: &Option<sp::ReferentialAction>,
1848    on_update: &Option<sp::ReferentialAction>,
1849) -> Result<(ReferentialAction, ReferentialAction)> {
1850    Ok((convert_fk_action(on_delete)?, convert_fk_action(on_update)?))
1851}
1852
1853fn convert_fk_action(action: &Option<sp::ReferentialAction>) -> Result<ReferentialAction> {
1854    match action {
1855        None | Some(sp::ReferentialAction::NoAction) => Ok(ReferentialAction::NoAction),
1856        Some(sp::ReferentialAction::Restrict) => Ok(ReferentialAction::Restrict),
1857        Some(sp::ReferentialAction::Cascade) => Ok(ReferentialAction::Cascade),
1858        Some(sp::ReferentialAction::SetNull) => Ok(ReferentialAction::SetNull),
1859        Some(sp::ReferentialAction::SetDefault) => Ok(ReferentialAction::SetDefault),
1860    }
1861}
1862
1863fn convert_fk_characteristics(ch: &Option<sp::ConstraintCharacteristics>) -> (bool, bool) {
1864    let Some(c) = ch else {
1865        return (false, false);
1866    };
1867    let deferrable = c.deferrable.unwrap_or(false);
1868    let initially_deferred = matches!(c.initially, Some(sp::DeferrableInitial::Deferred));
1869    (deferrable, initially_deferred)
1870}
1871
1872fn convert_create_index(ci: sp::CreateIndex) -> Result<Statement> {
1873    let index_name = ci
1874        .name
1875        .as_ref()
1876        .map(object_name_to_string)
1877        .ok_or_else(|| SqlError::Parse("index name required".into()))?;
1878
1879    let table_name = object_name_to_string(&ci.table_name);
1880
1881    let mut columns: Vec<String> = Vec::with_capacity(ci.columns.len());
1882    let mut collations: Vec<crate::types::Collation> = Vec::with_capacity(ci.columns.len());
1883    let mut key_exprs: Vec<Option<(Expr, String)>> = Vec::with_capacity(ci.columns.len());
1884    for idx_col in &ci.columns {
1885        let (name, coll, expr_entry) = match &idx_col.column.expr {
1886            sp::Expr::Identifier(ident) => {
1887                (ident.value.clone(), crate::types::Collation::Binary, None)
1888            }
1889            sp::Expr::Collate {
1890                expr: inner,
1891                collation,
1892            } => match inner.as_ref() {
1893                sp::Expr::Identifier(ident) => {
1894                    let coll_name = object_name_to_string(collation);
1895                    let coll = crate::types::Collation::from_name(&coll_name).ok_or_else(|| {
1896                        SqlError::Unsupported(format!(
1897                            "collation '{coll_name}' not supported (BINARY/NOCASE/RTRIM only)"
1898                        ))
1899                    })?;
1900                    (ident.value.clone(), coll, None)
1901                }
1902                inner_expr => {
1903                    let sql = inner_expr.to_string();
1904                    let expr = convert_expr(inner_expr)?;
1905                    (
1906                        sql.clone(),
1907                        crate::types::Collation::Binary,
1908                        Some((expr, sql)),
1909                    )
1910                }
1911            },
1912            other => {
1913                let sql = other.to_string();
1914                let expr = convert_expr(other)?;
1915                (
1916                    sql.clone(),
1917                    crate::types::Collation::Binary,
1918                    Some((expr, sql)),
1919                )
1920            }
1921        };
1922        columns.push(name);
1923        collations.push(coll);
1924        key_exprs.push(expr_entry);
1925    }
1926
1927    if columns.is_empty() {
1928        return Err(SqlError::Parse(
1929            "index must have at least one column".into(),
1930        ));
1931    }
1932
1933    let (predicate_sql, predicate_expr) = match &ci.predicate {
1934        Some(sp_expr) => {
1935            let expr = convert_expr(sp_expr)?;
1936            validate_partial_index_predicate(&expr)?;
1937            (Some(sp_expr.to_string()), Some(expr))
1938        }
1939        None => (None, None),
1940    };
1941
1942    let mut ann_filter_cols: Vec<String> = Vec::new();
1943    let kind = match &ci.using {
1944        None => crate::types::IndexKind::BTree,
1945        Some(sp::IndexType::BTree) => crate::types::IndexKind::BTree,
1946        Some(sp::IndexType::GIN) => {
1947            let ops = parse_gin_with_ops(&ci.with)?;
1948            crate::types::IndexKind::Inverted(crate::types::InvertedKind::Gin(ops))
1949        }
1950        Some(sp::IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("fts") => {
1951            let config_id = parse_fts_with_config(&ci.with)?;
1952            crate::types::IndexKind::Inverted(crate::types::InvertedKind::Fts { config_id })
1953        }
1954        Some(sp::IndexType::Custom(ident)) if ident.value.eq_ignore_ascii_case("ann") => {
1955            let (metric, filter_cols) = parse_ann_with_opts(&ci.with)?;
1956            ann_filter_cols = filter_cols;
1957            crate::types::IndexKind::Inverted(crate::types::InvertedKind::Ann { metric })
1958        }
1959        Some(other) => {
1960            return Err(SqlError::Unsupported(format!(
1961                "index method {other}; supported: BTREE, GIN, FTS, ANN"
1962            )));
1963        }
1964    };
1965
1966    Ok(Statement::CreateIndex(CreateIndexStmt {
1967        index_name,
1968        table_name,
1969        columns,
1970        key_exprs,
1971        unique: ci.unique,
1972        if_not_exists: ci.if_not_exists,
1973        predicate_sql,
1974        predicate_expr,
1975        collations,
1976        kind,
1977        ann_filter_cols,
1978        concurrently: ci.concurrently,
1979    }))
1980}
1981
1982fn parse_gin_with_ops(with: &[sp::Expr]) -> Result<crate::types::GinOpsClass> {
1983    use crate::types::GinOpsClass;
1984    let mut ops_name: Option<String> = None;
1985    for expr in with {
1986        match expr {
1987            sp::Expr::BinaryOp {
1988                left,
1989                op: sp::BinaryOperator::Eq,
1990                right,
1991            } => {
1992                let key = match left.as_ref() {
1993                    sp::Expr::Identifier(id) => id.value.to_ascii_lowercase(),
1994                    other => {
1995                        return Err(SqlError::Unsupported(format!(
1996                            "GIN WITH option key: {other}"
1997                        )));
1998                    }
1999                };
2000                if key != "ops" {
2001                    return Err(SqlError::Unsupported(format!(
2002                        "GIN WITH option: unknown key '{key}' (only 'ops' supported)"
2003                    )));
2004                }
2005                let val = match right.as_ref() {
2006                    sp::Expr::Value(v) => match &v.value {
2007                        sp::Value::SingleQuotedString(s) => s.clone(),
2008                        sp::Value::DoubleQuotedString(s) => s.clone(),
2009                        other => {
2010                            return Err(SqlError::Parse(format!(
2011                                "GIN ops value must be a string literal, got: {other}"
2012                            )))
2013                        }
2014                    },
2015                    sp::Expr::Identifier(id) => id.value.clone(),
2016                    other => {
2017                        return Err(SqlError::Parse(format!(
2018                            "GIN ops value must be a string literal, got: {other}"
2019                        )));
2020                    }
2021                };
2022                ops_name = Some(val);
2023            }
2024            other => {
2025                return Err(SqlError::Unsupported(format!(
2026                    "GIN WITH option must be `key = value`, got: {other}"
2027                )));
2028            }
2029        }
2030    }
2031    let lower = ops_name.as_deref().map(|s| s.to_ascii_lowercase());
2032    match lower.as_deref() {
2033        None | Some("jsonb_ops") => Ok(GinOpsClass::JsonbOps),
2034        Some("jsonb_path_ops") => Ok(GinOpsClass::JsonbPathOps),
2035        Some(other) => Err(SqlError::Unsupported(format!(
2036            "GIN opclass '{other}'; supported: jsonb_ops, jsonb_path_ops"
2037        ))),
2038    }
2039}
2040
2041fn parse_fts_with_config(with: &[sp::Expr]) -> Result<u8> {
2042    let mut config_name: Option<String> = None;
2043    for expr in with {
2044        match expr {
2045            sp::Expr::BinaryOp {
2046                left,
2047                op: sp::BinaryOperator::Eq,
2048                right,
2049            } => {
2050                let key = match left.as_ref() {
2051                    sp::Expr::Identifier(id) => id.value.to_ascii_lowercase(),
2052                    other => {
2053                        return Err(SqlError::Unsupported(format!(
2054                            "FTS WITH option key: {other}"
2055                        )));
2056                    }
2057                };
2058                if key != "config" {
2059                    return Err(SqlError::Unsupported(format!(
2060                        "FTS WITH option: unknown key '{key}' (only 'config' supported)"
2061                    )));
2062                }
2063                let val = match right.as_ref() {
2064                    sp::Expr::Value(v) => match &v.value {
2065                        sp::Value::SingleQuotedString(s) => s.clone(),
2066                        sp::Value::DoubleQuotedString(s) => s.clone(),
2067                        other => {
2068                            return Err(SqlError::Parse(format!(
2069                                "FTS config value must be a string literal, got: {other}"
2070                            )))
2071                        }
2072                    },
2073                    sp::Expr::Identifier(id) => id.value.clone(),
2074                    other => {
2075                        return Err(SqlError::Parse(format!(
2076                            "FTS config value must be a string literal, got: {other}"
2077                        )));
2078                    }
2079                };
2080                config_name = Some(val);
2081            }
2082            other => {
2083                return Err(SqlError::Unsupported(format!(
2084                    "FTS WITH option must be `key = value`, got: {other}"
2085                )));
2086            }
2087        }
2088    }
2089    let name = config_name.unwrap_or_else(|| "english".to_string());
2090    Ok(crate::fts::TokenizerKind::from_name(&name)?.as_config_id())
2091}
2092
2093/// Parse the ANN `WITH (...)` options. Supported keys: `metric` (l2|inner|cosine)
2094/// and `filters` (comma-separated low-cardinality column names pushed into the
2095/// PRISM cell filter). Returns the metric and the raw, lowercased filter names.
2096fn parse_ann_with_opts(with: &[sp::Expr]) -> Result<(crate::types::AnnMetric, Vec<String>)> {
2097    use crate::types::AnnMetric;
2098    let mut metric_name: Option<String> = None;
2099    let mut filter_cols: Vec<String> = Vec::new();
2100    for expr in with {
2101        match expr {
2102            sp::Expr::BinaryOp {
2103                left,
2104                op: sp::BinaryOperator::Eq,
2105                right,
2106            } => {
2107                let key = match left.as_ref() {
2108                    sp::Expr::Identifier(id) => id.value.to_ascii_lowercase(),
2109                    other => {
2110                        return Err(SqlError::Unsupported(format!(
2111                            "ANN WITH option key: {other}"
2112                        )));
2113                    }
2114                };
2115                let val = match right.as_ref() {
2116                    sp::Expr::Value(v) => match &v.value {
2117                        sp::Value::SingleQuotedString(s) => s.clone(),
2118                        sp::Value::DoubleQuotedString(s) => s.clone(),
2119                        other => {
2120                            return Err(SqlError::Parse(format!(
2121                                "ANN '{key}' value must be a string literal, got: {other}"
2122                            )))
2123                        }
2124                    },
2125                    sp::Expr::Identifier(id) => id.value.clone(),
2126                    other => {
2127                        return Err(SqlError::Parse(format!(
2128                            "ANN '{key}' value must be a string literal, got: {other}"
2129                        )));
2130                    }
2131                };
2132                match key.as_str() {
2133                    "metric" => metric_name = Some(val),
2134                    "filters" => {
2135                        filter_cols = val
2136                            .split(',')
2137                            .map(|s| s.trim().to_ascii_lowercase())
2138                            .filter(|s| !s.is_empty())
2139                            .collect();
2140                    }
2141                    other => {
2142                        return Err(SqlError::Unsupported(format!(
2143                            "ANN WITH option: unknown key '{other}' (supported: metric, filters)"
2144                        )));
2145                    }
2146                }
2147            }
2148            other => {
2149                return Err(SqlError::Unsupported(format!(
2150                    "ANN WITH option must be `key = value`, got: {other}"
2151                )));
2152            }
2153        }
2154    }
2155    let lower = metric_name.as_deref().map(|s| s.to_ascii_lowercase());
2156    let metric = match lower.as_deref() {
2157        None | Some("l2") => AnnMetric::L2,
2158        Some("inner") | Some("inner_product") | Some("ip") => AnnMetric::Inner,
2159        Some("cosine") => AnnMetric::Cosine,
2160        Some(other) => {
2161            return Err(SqlError::Unsupported(format!(
2162                "ANN metric '{other}'; supported: l2, inner, cosine"
2163            )))
2164        }
2165    };
2166    Ok((metric, filter_cols))
2167}
2168
2169fn validate_partial_index_predicate(expr: &Expr) -> Result<()> {
2170    let mut bad: Option<&'static str> = None;
2171    visit_expr(expr, &mut |e| {
2172        if bad.is_some() {
2173            return;
2174        }
2175        match e {
2176            Expr::ScalarSubquery(_) | Expr::Exists { .. } | Expr::InSubquery { .. } => {
2177                bad = Some("subqueries");
2178            }
2179            Expr::CountStar => bad = Some("aggregates"),
2180            Expr::WindowFunction { .. } => bad = Some("window functions"),
2181            Expr::Parameter(_) => bad = Some("bound parameters"),
2182            Expr::QualifiedColumn { .. } => bad = Some("cross-table references"),
2183            Expr::Function { name, args, .. } => {
2184                if is_aggregate_function(name) {
2185                    bad = Some("aggregates");
2186                } else if crate::eval::is_volatile_function_expr(&name.to_ascii_uppercase(), args) {
2187                    bad = Some("non-deterministic functions");
2188                }
2189            }
2190            _ => {}
2191        }
2192    });
2193    if let Some(reason) = bad {
2194        return Err(SqlError::Unsupported(format!(
2195            "partial index predicate cannot contain {reason}"
2196        )));
2197    }
2198    Ok(())
2199}
2200
2201fn is_aggregate_function(name: &str) -> bool {
2202    matches!(
2203        name.to_ascii_lowercase().as_str(),
2204        "count" | "sum" | "avg" | "min" | "max" | "total" | "group_concat" | "string_agg"
2205    )
2206}
2207
2208fn convert_create_trigger(ct: sp::CreateTrigger) -> Result<Statement> {
2209    let name = object_name_to_string(&ct.name);
2210    let target = object_name_to_string(&ct.table_name);
2211
2212    let timing = match ct.period {
2213        Some(sp::TriggerPeriod::Before) => TriggerTiming::Before,
2214        Some(sp::TriggerPeriod::After) => TriggerTiming::After,
2215        Some(sp::TriggerPeriod::InsteadOf) => TriggerTiming::InsteadOf,
2216        _ => {
2217            return Err(SqlError::Parse(
2218                "CREATE TRIGGER requires BEFORE, AFTER, or INSTEAD OF".into(),
2219            ));
2220        }
2221    };
2222
2223    let mut events: Vec<TriggerEvent> = Vec::with_capacity(ct.events.len());
2224    for ev in &ct.events {
2225        let mapped = match ev {
2226            sp::TriggerEvent::Insert => TriggerEvent::Insert,
2227            sp::TriggerEvent::Delete => TriggerEvent::Delete,
2228            sp::TriggerEvent::Update(cols) => {
2229                TriggerEvent::Update(cols.iter().map(|i| i.value.to_ascii_lowercase()).collect())
2230            }
2231            sp::TriggerEvent::Truncate => {
2232                return Err(SqlError::Unsupported(
2233                    "TRUNCATE triggers are not supported".into(),
2234                ));
2235            }
2236        };
2237        events.push(mapped);
2238    }
2239    if events.is_empty() {
2240        return Err(SqlError::Parse(
2241            "CREATE TRIGGER requires at least one event (INSERT/UPDATE/DELETE)".into(),
2242        ));
2243    }
2244
2245    let granularity = match ct.trigger_object {
2246        Some(sp::TriggerObjectKind::For(sp::TriggerObject::Statement))
2247        | Some(sp::TriggerObjectKind::ForEach(sp::TriggerObject::Statement)) => {
2248            TriggerGranularity::ForEachStatement
2249        }
2250        // FOR EACH ROW is the default per SQLite semantics when omitted.
2251        _ => TriggerGranularity::ForEachRow,
2252    };
2253
2254    if timing == TriggerTiming::InsteadOf && granularity == TriggerGranularity::ForEachStatement {
2255        return Err(SqlError::Unsupported(
2256            "INSTEAD OF triggers must be FOR EACH ROW".into(),
2257        ));
2258    }
2259
2260    let mut referencing: Option<TransitionTables> = None;
2261    if !ct.referencing.is_empty() {
2262        let mut new_alias: Option<String> = None;
2263        let mut old_alias: Option<String> = None;
2264        for r in &ct.referencing {
2265            let alias = object_name_to_string(&r.transition_relation_name);
2266            match r.refer_type {
2267                sp::TriggerReferencingType::NewTable => new_alias = Some(alias),
2268                sp::TriggerReferencingType::OldTable => old_alias = Some(alias),
2269            }
2270        }
2271        referencing = Some(TransitionTables {
2272            new_table_alias: new_alias,
2273            old_table_alias: old_alias,
2274        });
2275    }
2276
2277    let when_expr = ct.condition.as_ref().map(convert_expr).transpose()?;
2278    let when_sql = ct.condition.as_ref().map(|e| e.to_string());
2279
2280    let inner_statements: &[sp::Statement] = match &ct.statements {
2281        Some(sp::ConditionalStatements::Sequence { statements })
2282        | Some(sp::ConditionalStatements::BeginEnd(sp::BeginEndStatements {
2283            statements, ..
2284        })) => statements,
2285        None => {
2286            return Err(SqlError::Parse(
2287                "CREATE TRIGGER body must contain BEGIN ... END or one or more statements".into(),
2288            ));
2289        }
2290    };
2291    let body: Vec<Statement> = inner_statements
2292        .iter()
2293        .cloned()
2294        .map(convert_statement)
2295        .collect::<Result<Vec<_>>>()?;
2296    let body_sql: String = inner_statements
2297        .iter()
2298        .map(|s| s.to_string())
2299        .collect::<Vec<_>>()
2300        .join(";");
2301
2302    Ok(Statement::CreateTrigger(Box::new(CreateTriggerStmt {
2303        name,
2304        timing,
2305        events,
2306        target,
2307        granularity,
2308        referencing,
2309        when_sql,
2310        when_expr,
2311        body_sql,
2312        body,
2313        if_not_exists: false,
2314    })))
2315}
2316
2317fn convert_create_view(cv: sp::CreateView) -> Result<Statement> {
2318    let name = object_name_to_string(&cv.name);
2319    let sql = cv.query.to_string();
2320
2321    let parsed_select = parse_select_query(&sql)?;
2322
2323    if cv.materialized {
2324        return Ok(Statement::CreateMaterializedView(Box::new(
2325            CreateMatviewStmt {
2326                name,
2327                select_sql: sql,
2328                select_parsed: parsed_select,
2329                with_data: true,
2330                if_not_exists: cv.if_not_exists,
2331            },
2332        )));
2333    }
2334
2335    let column_aliases: Vec<String> = cv
2336        .columns
2337        .iter()
2338        .map(|c| c.name.value.to_ascii_lowercase())
2339        .collect();
2340
2341    Ok(Statement::CreateView(CreateViewStmt {
2342        name,
2343        sql,
2344        column_aliases,
2345        or_replace: cv.or_replace,
2346        if_not_exists: cv.if_not_exists,
2347    }))
2348}
2349
2350fn parse_select_query(sql: &str) -> Result<SelectQuery> {
2351    let stmts = parse_sql_multi(sql)?;
2352    if stmts.len() != 1 {
2353        return Err(SqlError::Parse(
2354            "matview body must be a single SELECT statement".into(),
2355        ));
2356    }
2357    match stmts.into_iter().next().unwrap() {
2358        Statement::Select(sq) => Ok(*sq),
2359        _ => Err(SqlError::Parse(
2360            "matview body must be a SELECT statement".into(),
2361        )),
2362    }
2363}
2364
2365fn convert_insert(insert: sp::Insert) -> Result<Statement> {
2366    let table = match &insert.table {
2367        sp::TableObject::TableName(name) => object_name_to_string(name).to_ascii_lowercase(),
2368        _ => return Err(SqlError::Unsupported("INSERT into non-table object".into())),
2369    };
2370
2371    let columns: Vec<String> = insert
2372        .columns
2373        .iter()
2374        .map(|c| c.value.to_ascii_lowercase())
2375        .collect();
2376
2377    let query = insert
2378        .source
2379        .ok_or_else(|| SqlError::Parse("INSERT requires VALUES or SELECT".into()))?;
2380
2381    let source = match *query.body {
2382        sp::SetExpr::Values(sp::Values { rows, .. }) => {
2383            let mut result = Vec::new();
2384            for row in rows {
2385                let mut exprs = Vec::new();
2386                for expr in row {
2387                    exprs.push(convert_expr(&expr)?);
2388                }
2389                result.push(exprs);
2390            }
2391            InsertSource::Values(result)
2392        }
2393        _ => {
2394            let (ctes, recursive) = if let Some(ref with) = query.with {
2395                convert_with(with)?
2396            } else {
2397                (vec![], false)
2398            };
2399            let body = convert_query_body(&query)?;
2400            InsertSource::Select(Box::new(SelectQuery {
2401                ctes,
2402                recursive,
2403                body,
2404            }))
2405        }
2406    };
2407
2408    let on_conflict = insert.on.as_ref().map(convert_on_insert).transpose()?;
2409    let returning = convert_returning(insert.returning.as_deref())?;
2410
2411    Ok(Statement::Insert(InsertStmt {
2412        table,
2413        columns,
2414        source,
2415        on_conflict,
2416        returning,
2417    }))
2418}
2419
2420fn convert_on_insert(on: &sp::OnInsert) -> Result<OnConflictClause> {
2421    match on {
2422        sp::OnInsert::OnConflict(oc) => {
2423            let target = oc
2424                .conflict_target
2425                .as_ref()
2426                .map(convert_conflict_target)
2427                .transpose()?;
2428            let action = convert_on_conflict_action(&oc.action)?;
2429            Ok(OnConflictClause { target, action })
2430        }
2431        sp::OnInsert::DuplicateKeyUpdate(_) => Err(SqlError::Parse(
2432            "ON DUPLICATE KEY UPDATE is MySQL-specific; use ON CONFLICT".into(),
2433        )),
2434        _ => Err(SqlError::Parse("unsupported ON INSERT clause".into())),
2435    }
2436}
2437
2438fn convert_conflict_target(target: &sp::ConflictTarget) -> Result<ConflictTarget> {
2439    match target {
2440        sp::ConflictTarget::Columns(cols) => Ok(ConflictTarget::Columns(
2441            cols.iter().map(|c| c.value.to_ascii_lowercase()).collect(),
2442        )),
2443        sp::ConflictTarget::OnConstraint(name) => {
2444            if name.0.len() > 1 {
2445                return Err(SqlError::Parse(
2446                    "qualified constraint names not supported".into(),
2447                ));
2448            }
2449            Ok(ConflictTarget::Constraint(
2450                object_name_to_string(name).to_ascii_lowercase(),
2451            ))
2452        }
2453    }
2454}
2455
2456fn convert_on_conflict_action(action: &sp::OnConflictAction) -> Result<OnConflictAction> {
2457    match action {
2458        sp::OnConflictAction::DoNothing => Ok(OnConflictAction::DoNothing),
2459        sp::OnConflictAction::DoUpdate(du) => {
2460            let assignments = du
2461                .assignments
2462                .iter()
2463                .map(|a| {
2464                    let col = match &a.target {
2465                        sp::AssignmentTarget::ColumnName(name) => {
2466                            object_name_to_string(name).to_ascii_lowercase()
2467                        }
2468                        _ => {
2469                            return Err(SqlError::Unsupported(
2470                                "tuple assignment in ON CONFLICT".into(),
2471                            ))
2472                        }
2473                    };
2474                    let expr = convert_expr(&a.value)?;
2475                    Ok((col, expr))
2476                })
2477                .collect::<Result<_>>()?;
2478            let where_clause = du.selection.as_ref().map(convert_expr).transpose()?;
2479            Ok(OnConflictAction::DoUpdate {
2480                assignments,
2481                where_clause,
2482            })
2483        }
2484    }
2485}
2486
2487fn convert_select_body(select: &sp::Select) -> Result<SelectStmt> {
2488    let distinct = match &select.distinct {
2489        Some(sp::Distinct::Distinct) => true,
2490        Some(sp::Distinct::On(_)) => {
2491            return Err(SqlError::Unsupported("DISTINCT ON".into()));
2492        }
2493        _ => false,
2494    };
2495
2496    let (from, from_alias, from_subquery, from_args, from_json_table, joins) =
2497        if select.from.is_empty() {
2498            (String::new(), None, None, None, None, vec![])
2499        } else {
2500            let first_twj = &select.from[0];
2501            let (first_name, first_alias, first_sub, first_args, first_jt) =
2502                convert_from_relation(&first_twj.relation)?;
2503            let mut joins: Vec<JoinClause> = first_twj
2504                .joins
2505                .iter()
2506                .map(convert_join)
2507                .collect::<Result<Vec<_>>>()?;
2508            for extra_twj in &select.from[1..] {
2509                let (extra_name, extra_alias, extra_sub, extra_args, extra_jt) =
2510                    convert_from_relation(&extra_twj.relation)?;
2511                if extra_jt.is_some() {
2512                    return Err(SqlError::Unsupported(
2513                        "JSON_TABLE in extra FROM positions not supported".into(),
2514                    ));
2515                }
2516                joins.push(JoinClause {
2517                    join_type: JoinType::Cross,
2518                    table: TableRef {
2519                        name: extra_name,
2520                        alias: extra_alias,
2521                        args: extra_args,
2522                    },
2523                    subquery: extra_sub,
2524                    on_clause: None,
2525                });
2526                for j in &extra_twj.joins {
2527                    joins.push(convert_join(j)?);
2528                }
2529            }
2530            (
2531                first_name,
2532                first_alias,
2533                first_sub,
2534                first_args,
2535                first_jt,
2536                joins,
2537            )
2538        };
2539    for j in &joins {
2540        if let Some(sub) = &j.subquery {
2541            if sub.lateral && matches!(j.join_type, JoinType::Right | JoinType::FullOuter) {
2542                return Err(SqlError::Unsupported(
2543                    "LATERAL is not allowed on the right side of RIGHT JOIN or FULL OUTER JOIN"
2544                        .into(),
2545                ));
2546            }
2547        }
2548    }
2549
2550    let columns: Vec<SelectColumn> = select
2551        .projection
2552        .iter()
2553        .map(convert_select_item)
2554        .collect::<Result<_>>()?;
2555
2556    let where_clause = select.selection.as_ref().map(convert_expr).transpose()?;
2557
2558    let group_by = match &select.group_by {
2559        sp::GroupByExpr::Expressions(exprs, _) => {
2560            exprs.iter().map(convert_expr).collect::<Result<_>>()?
2561        }
2562        sp::GroupByExpr::All(_) => {
2563            return Err(SqlError::Unsupported("GROUP BY ALL".into()));
2564        }
2565    };
2566
2567    let having = select.having.as_ref().map(convert_expr).transpose()?;
2568
2569    Ok(SelectStmt {
2570        columns,
2571        from,
2572        from_alias,
2573        from_subquery,
2574        from_args,
2575        from_json_table,
2576        joins,
2577        distinct,
2578        where_clause,
2579        order_by: vec![],
2580        limit: None,
2581        offset: None,
2582        group_by,
2583        having,
2584    })
2585}
2586
2587type FromRelation = (
2588    String,
2589    Option<String>,
2590    Option<Box<DerivedTable>>,
2591    Option<Vec<Expr>>,
2592    Option<Box<JsonTableSpec>>,
2593);
2594
2595fn convert_from_relation(relation: &sp::TableFactor) -> Result<FromRelation> {
2596    match relation {
2597        sp::TableFactor::Table {
2598            name, alias, args, ..
2599        } => {
2600            let table_name = object_name_to_string(name);
2601            let alias_str = alias.as_ref().map(|a| a.name.value.clone());
2602            let args_converted = match args {
2603                Some(table_args) => {
2604                    let mut converted = Vec::with_capacity(table_args.args.len());
2605                    for arg in &table_args.args {
2606                        match arg {
2607                            sp::FunctionArg::Unnamed(sp::FunctionArgExpr::Expr(e)) => {
2608                                converted.push(convert_expr(e)?);
2609                            }
2610                            _ => {
2611                                return Err(SqlError::Unsupported(
2612                                    "non-positional table function argument".into(),
2613                                ));
2614                            }
2615                        }
2616                    }
2617                    Some(converted)
2618                }
2619                None => None,
2620            };
2621            Ok((table_name, alias_str, None, args_converted, None))
2622        }
2623        sp::TableFactor::Derived {
2624            lateral,
2625            subquery,
2626            alias,
2627            ..
2628        } => {
2629            let alias_name = match alias {
2630                Some(a) => a.name.value.clone(),
2631                None => return Err(SqlError::Unsupported("derived table requires alias".into())),
2632            };
2633            let inner = convert_select_query(subquery)?;
2634            for cte in &inner.ctes {
2635                if matches!(
2636                    &cte.body,
2637                    QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_)
2638                ) {
2639                    return Err(SqlError::Unsupported(
2640                        "WITH-DML inside subqueries (PG forbids)".into(),
2641                    ));
2642                }
2643            }
2644            let derived = DerivedTable {
2645                query: Box::new(inner),
2646                lateral: *lateral,
2647                alias: alias_name.clone(),
2648            };
2649            Ok((alias_name, None, Some(Box::new(derived)), None, None))
2650        }
2651        sp::TableFactor::JsonTable {
2652            json_expr,
2653            json_path,
2654            columns,
2655            alias,
2656        } => {
2657            let alias_name = match alias {
2658                Some(a) => a.name.value.clone(),
2659                None => "json_table".to_string(),
2660            };
2661            let source = convert_expr(json_expr)?;
2662            let root_path = json_path_value_to_string(json_path)?;
2663            let cols = columns
2664                .iter()
2665                .map(convert_json_table_column)
2666                .collect::<Result<Vec<_>>>()?;
2667            let spec = JsonTableSpec {
2668                source,
2669                root_path,
2670                columns: cols,
2671            };
2672            Ok((alias_name, None, None, None, Some(Box::new(spec))))
2673        }
2674        _ => Err(SqlError::Unsupported("non-table FROM source".into())),
2675    }
2676}
2677
2678fn json_path_value_to_string(v: &sp::Value) -> Result<String> {
2679    use sp::Value as V;
2680    match v {
2681        V::SingleQuotedString(s)
2682        | V::DoubleQuotedString(s)
2683        | V::DollarQuotedString(sp::DollarQuotedString { value: s, .. })
2684        | V::TripleSingleQuotedString(s)
2685        | V::TripleDoubleQuotedString(s) => Ok(s.clone()),
2686        other => Err(SqlError::Unsupported(format!(
2687            "JSON_TABLE path must be a string literal, got: {other}"
2688        ))),
2689    }
2690}
2691
2692fn convert_json_table_column(c: &sp::JsonTableColumn) -> Result<JsonTableCol> {
2693    match c {
2694        sp::JsonTableColumn::Named(n) => {
2695            let path = json_path_value_to_string(&n.path)?;
2696            Ok(JsonTableCol::Named {
2697                name: n.name.value.clone(),
2698                ty: convert_data_type(&n.r#type)?,
2699                path,
2700                exists: n.exists,
2701            })
2702        }
2703        sp::JsonTableColumn::ForOrdinality(ident) => Ok(JsonTableCol::Ordinality {
2704            name: ident.value.clone(),
2705        }),
2706        sp::JsonTableColumn::Nested(n) => {
2707            let path = json_path_value_to_string(&n.path)?;
2708            let columns = n
2709                .columns
2710                .iter()
2711                .map(convert_json_table_column)
2712                .collect::<Result<Vec<_>>>()?;
2713            Ok(JsonTableCol::Nested { path, columns })
2714        }
2715    }
2716}
2717
2718fn convert_set_expr(set_expr: &sp::SetExpr) -> Result<QueryBody> {
2719    match set_expr {
2720        sp::SetExpr::Select(sel) => Ok(QueryBody::Select(Box::new(convert_select_body(sel)?))),
2721        sp::SetExpr::Insert(stmt) => match convert_statement(stmt.clone())? {
2722            Statement::Insert(ins) => Ok(QueryBody::Insert(Box::new(ins))),
2723            _ => Err(SqlError::Parse("expected INSERT in WITH-DML body".into())),
2724        },
2725        sp::SetExpr::Update(stmt) => match convert_statement(stmt.clone())? {
2726            Statement::Update(upd) => Ok(QueryBody::Update(Box::new(upd))),
2727            _ => Err(SqlError::Parse("expected UPDATE in WITH-DML body".into())),
2728        },
2729        sp::SetExpr::Delete(stmt) => match convert_statement(stmt.clone())? {
2730            Statement::Delete(del) => Ok(QueryBody::Delete(Box::new(del))),
2731            _ => Err(SqlError::Parse("expected DELETE in WITH-DML body".into())),
2732        },
2733        sp::SetExpr::SetOperation {
2734            op,
2735            set_quantifier,
2736            left,
2737            right,
2738        } => {
2739            let set_op = match op {
2740                sp::SetOperator::Union => SetOp::Union,
2741                sp::SetOperator::Intersect => SetOp::Intersect,
2742                sp::SetOperator::Except | sp::SetOperator::Minus => SetOp::Except,
2743            };
2744            let all = match set_quantifier {
2745                sp::SetQuantifier::All => true,
2746                sp::SetQuantifier::None | sp::SetQuantifier::Distinct => false,
2747                _ => {
2748                    return Err(SqlError::Unsupported("BY NAME set operations".into()));
2749                }
2750            };
2751            Ok(QueryBody::Compound(Box::new(CompoundSelect {
2752                op: set_op,
2753                all,
2754                left: Box::new(convert_set_expr(left)?),
2755                right: Box::new(convert_set_expr(right)?),
2756                order_by: vec![],
2757                limit: None,
2758                offset: None,
2759            })))
2760        }
2761        _ => Err(SqlError::Unsupported("unsupported set expression".into())),
2762    }
2763}
2764
2765fn convert_query_body(query: &sp::Query) -> Result<QueryBody> {
2766    let mut body = convert_set_expr(&query.body)?;
2767
2768    let order_by = if let Some(ref ob) = query.order_by {
2769        match &ob.kind {
2770            sp::OrderByKind::Expressions(exprs) => exprs
2771                .iter()
2772                .map(convert_order_by_expr)
2773                .collect::<Result<_>>()?,
2774            sp::OrderByKind::All { .. } => {
2775                return Err(SqlError::Unsupported("ORDER BY ALL".into()));
2776            }
2777        }
2778    } else {
2779        vec![]
2780    };
2781
2782    let (limit, offset) = match &query.limit_clause {
2783        Some(sp::LimitClause::LimitOffset { limit, offset, .. }) => {
2784            let l = limit.as_ref().map(convert_expr).transpose()?;
2785            let o = offset
2786                .as_ref()
2787                .map(|o| convert_expr(&o.value))
2788                .transpose()?;
2789            (l, o)
2790        }
2791        Some(sp::LimitClause::OffsetCommaLimit { limit, offset }) => {
2792            let l = Some(convert_expr(limit)?);
2793            let o = Some(convert_expr(offset)?);
2794            (l, o)
2795        }
2796        None => (None, None),
2797    };
2798
2799    match &mut body {
2800        QueryBody::Select(sel) => {
2801            sel.order_by = order_by;
2802            sel.limit = limit;
2803            sel.offset = offset;
2804        }
2805        QueryBody::Compound(comp) => {
2806            comp.order_by = order_by;
2807            comp.limit = limit;
2808            comp.offset = offset;
2809        }
2810        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => {
2811            if !order_by.is_empty() || limit.is_some() || offset.is_some() {
2812                return Err(SqlError::Parse(
2813                    "ORDER BY / LIMIT / OFFSET not allowed on DML CTE body".into(),
2814                ));
2815            }
2816        }
2817    }
2818
2819    Ok(body)
2820}
2821
2822fn convert_subquery(query: &sp::Query) -> Result<SelectStmt> {
2823    if query.with.is_some() {
2824        return Err(SqlError::Unsupported("CTEs in subqueries".into()));
2825    }
2826    match convert_query_body(query)? {
2827        QueryBody::Select(s) => Ok(*s),
2828        QueryBody::Compound(_) => Err(SqlError::Unsupported(
2829            "UNION/INTERSECT/EXCEPT in subqueries".into(),
2830        )),
2831        QueryBody::Insert(_) | QueryBody::Update(_) | QueryBody::Delete(_) => Err(
2832            SqlError::Unsupported("WITH-DML inside subqueries (PG forbids)".into()),
2833        ),
2834    }
2835}
2836
2837fn convert_with(with: &sp::With) -> Result<(Vec<CteDefinition>, bool)> {
2838    let mut names = rustc_hash::FxHashSet::default();
2839    let mut ctes = Vec::new();
2840    for cte in &with.cte_tables {
2841        let name = cte.alias.name.value.to_ascii_lowercase();
2842        if !names.insert(name.clone()) {
2843            return Err(SqlError::DuplicateCteName(name));
2844        }
2845        let column_aliases: Vec<String> = cte
2846            .alias
2847            .columns
2848            .iter()
2849            .map(|c| c.name.value.to_ascii_lowercase())
2850            .collect();
2851        let body = convert_query_body(&cte.query)?;
2852        ctes.push(CteDefinition {
2853            name,
2854            column_aliases,
2855            body,
2856        });
2857    }
2858    Ok((ctes, with.recursive))
2859}
2860
2861fn convert_query(query: sp::Query) -> Result<Statement> {
2862    let sq = convert_select_query(&query)?;
2863    Ok(Statement::Select(Box::new(sq)))
2864}
2865
2866fn convert_select_query(query: &sp::Query) -> Result<SelectQuery> {
2867    let (ctes, recursive) = if let Some(ref with) = query.with {
2868        convert_with(with)?
2869    } else {
2870        (vec![], false)
2871    };
2872    let body = convert_query_body(query)?;
2873    Ok(SelectQuery {
2874        ctes,
2875        recursive,
2876        body,
2877    })
2878}
2879
2880fn convert_join(join: &sp::Join) -> Result<JoinClause> {
2881    let (join_type, constraint) = match &join.join_operator {
2882        sp::JoinOperator::Inner(c) => (JoinType::Inner, Some(c)),
2883        sp::JoinOperator::Join(c) => (JoinType::Inner, Some(c)),
2884        sp::JoinOperator::CrossJoin(c) => (JoinType::Cross, Some(c)),
2885        sp::JoinOperator::Left(c) | sp::JoinOperator::LeftOuter(c) => (JoinType::Left, Some(c)),
2886        sp::JoinOperator::LeftSemi(c) => (JoinType::Left, Some(c)),
2887        sp::JoinOperator::LeftAnti(c) => (JoinType::Left, Some(c)),
2888        sp::JoinOperator::Right(c) | sp::JoinOperator::RightOuter(c) => (JoinType::Right, Some(c)),
2889        sp::JoinOperator::RightSemi(c) => (JoinType::Right, Some(c)),
2890        sp::JoinOperator::RightAnti(c) => (JoinType::Right, Some(c)),
2891        sp::JoinOperator::FullOuter(c) => (JoinType::FullOuter, Some(c)),
2892        other => return Err(SqlError::Unsupported(format!("join type: {other:?}"))),
2893    };
2894
2895    let (name, alias, subquery, args, json_table) = convert_from_relation(&join.relation)?;
2896    if json_table.is_some() {
2897        return Err(SqlError::Unsupported(
2898            "JSON_TABLE on right side of JOIN".into(),
2899        ));
2900    }
2901
2902    let on_clause = match constraint {
2903        Some(sp::JoinConstraint::On(expr)) => Some(convert_expr(expr)?),
2904        Some(sp::JoinConstraint::None) | None => None,
2905        Some(other) => return Err(SqlError::Unsupported(format!("join constraint: {other:?}"))),
2906    };
2907
2908    Ok(JoinClause {
2909        join_type,
2910        table: TableRef { name, alias, args },
2911        subquery,
2912        on_clause,
2913    })
2914}
2915
2916fn convert_update(update: sp::Update) -> Result<Statement> {
2917    let table = match &update.table.relation {
2918        sp::TableFactor::Table { name, .. } => object_name_to_string(name),
2919        _ => return Err(SqlError::Unsupported("non-table UPDATE target".into())),
2920    };
2921
2922    let assignments = update
2923        .assignments
2924        .iter()
2925        .map(|a| {
2926            let col = match &a.target {
2927                sp::AssignmentTarget::ColumnName(name) => object_name_to_string(name),
2928                _ => return Err(SqlError::Unsupported("tuple assignment".into())),
2929            };
2930            let expr = convert_expr(&a.value)?;
2931            Ok((col, expr))
2932        })
2933        .collect::<Result<_>>()?;
2934
2935    let where_clause = update.selection.as_ref().map(convert_expr).transpose()?;
2936    let returning = convert_returning(update.returning.as_deref())?;
2937
2938    Ok(Statement::Update(UpdateStmt {
2939        table,
2940        assignments,
2941        where_clause,
2942        returning,
2943    }))
2944}
2945
2946fn convert_truncate(t: sp::Truncate) -> Result<Statement> {
2947    if matches!(t.cascade, Some(sp::CascadeOption::Cascade)) {
2948        return Err(SqlError::Unsupported("TRUNCATE CASCADE".into()));
2949    }
2950    if t.if_exists {
2951        return Err(SqlError::Unsupported("TRUNCATE IF EXISTS".into()));
2952    }
2953    if t.partitions.is_some() {
2954        return Err(SqlError::Unsupported("TRUNCATE PARTITION".into()));
2955    }
2956    if t.on_cluster.is_some() {
2957        return Err(SqlError::Unsupported("TRUNCATE ON CLUSTER".into()));
2958    }
2959    if t.table_names.is_empty() {
2960        return Err(SqlError::Parse(
2961            "TRUNCATE requires at least one table".into(),
2962        ));
2963    }
2964
2965    let tables: Vec<String> = t
2966        .table_names
2967        .iter()
2968        .map(|tt| object_name_to_string(&tt.name))
2969        .collect();
2970
2971    Ok(Statement::Truncate(TruncateStmt { tables }))
2972}
2973
2974fn convert_delete(delete: sp::Delete) -> Result<Statement> {
2975    let table_name = match &delete.from {
2976        sp::FromTable::WithFromKeyword(tables) => {
2977            if tables.len() != 1 {
2978                return Err(SqlError::Unsupported("multi-table DELETE".into()));
2979            }
2980            match &tables[0].relation {
2981                sp::TableFactor::Table { name, .. } => object_name_to_string(name),
2982                _ => return Err(SqlError::Unsupported("non-table DELETE target".into())),
2983            }
2984        }
2985        sp::FromTable::WithoutKeyword(tables) => {
2986            if tables.len() != 1 {
2987                return Err(SqlError::Unsupported("multi-table DELETE".into()));
2988            }
2989            match &tables[0].relation {
2990                sp::TableFactor::Table { name, .. } => object_name_to_string(name),
2991                _ => return Err(SqlError::Unsupported("non-table DELETE target".into())),
2992            }
2993        }
2994    };
2995
2996    let where_clause = delete.selection.as_ref().map(convert_expr).transpose()?;
2997    let returning = convert_returning(delete.returning.as_deref())?;
2998
2999    Ok(Statement::Delete(DeleteStmt {
3000        table: table_name,
3001        where_clause,
3002        returning,
3003    }))
3004}
3005
3006fn convert_expr(expr: &sp::Expr) -> Result<Expr> {
3007    match expr {
3008        sp::Expr::Value(v) => convert_value(&v.value),
3009        sp::Expr::Identifier(ident) => Ok(Expr::Column(ident.value.to_ascii_lowercase())),
3010        sp::Expr::CompoundIdentifier(parts) => {
3011            if parts.len() == 2 {
3012                Ok(Expr::QualifiedColumn {
3013                    table: parts[0].value.to_ascii_lowercase(),
3014                    column: parts[1].value.to_ascii_lowercase(),
3015                })
3016            } else {
3017                Ok(Expr::Column(
3018                    parts.last().unwrap().value.to_ascii_lowercase(),
3019                ))
3020            }
3021        }
3022        sp::Expr::BinaryOp { left, op, right } => {
3023            let bin_op = convert_bin_op(op)?;
3024            Ok(Expr::BinaryOp {
3025                left: Box::new(convert_expr(left)?),
3026                op: bin_op,
3027                right: Box::new(convert_expr(right)?),
3028            })
3029        }
3030        sp::Expr::UnaryOp { op, expr } => {
3031            let unary_op = match op {
3032                sp::UnaryOperator::Minus => UnaryOp::Neg,
3033                sp::UnaryOperator::Not => UnaryOp::Not,
3034                _ => return Err(SqlError::Unsupported(format!("unary op: {op}"))),
3035            };
3036            Ok(Expr::UnaryOp {
3037                op: unary_op,
3038                expr: Box::new(convert_expr(expr)?),
3039            })
3040        }
3041        sp::Expr::IsNull(e) => Ok(Expr::IsNull(Box::new(convert_expr(e)?))),
3042        sp::Expr::IsNotNull(e) => Ok(Expr::IsNotNull(Box::new(convert_expr(e)?))),
3043        sp::Expr::Nested(e) => convert_expr(e),
3044        sp::Expr::Function(func) => convert_function(func),
3045        sp::Expr::InSubquery {
3046            expr: e,
3047            subquery,
3048            negated,
3049        } => {
3050            let inner_expr = convert_expr(e)?;
3051            let stmt = convert_subquery(subquery)?;
3052            Ok(Expr::InSubquery {
3053                expr: Box::new(inner_expr),
3054                subquery: Box::new(stmt),
3055                negated: *negated,
3056            })
3057        }
3058        sp::Expr::InList {
3059            expr: e,
3060            list,
3061            negated,
3062        } => {
3063            let inner_expr = convert_expr(e)?;
3064            let items = list.iter().map(convert_expr).collect::<Result<Vec<_>>>()?;
3065            Ok(Expr::InList {
3066                expr: Box::new(inner_expr),
3067                list: items,
3068                negated: *negated,
3069            })
3070        }
3071        sp::Expr::Exists { subquery, negated } => {
3072            let stmt = convert_subquery(subquery)?;
3073            Ok(Expr::Exists {
3074                subquery: Box::new(stmt),
3075                negated: *negated,
3076            })
3077        }
3078        sp::Expr::Subquery(query) => {
3079            let stmt = convert_subquery(query)?;
3080            Ok(Expr::ScalarSubquery(Box::new(stmt)))
3081        }
3082        sp::Expr::AnyOp {
3083            left,
3084            compare_op,
3085            right,
3086            ..
3087        } => convert_quantified(left, compare_op, right, Quantifier::Any),
3088        sp::Expr::AllOp {
3089            left,
3090            compare_op,
3091            right,
3092        } => convert_quantified(left, compare_op, right, Quantifier::All),
3093        sp::Expr::Array(sp::Array { elem, .. }) => {
3094            let elems: Result<Vec<Expr>> = elem.iter().map(convert_expr).collect();
3095            Ok(Expr::ArrayLiteral(elems?))
3096        }
3097        sp::Expr::Between {
3098            expr: e,
3099            negated,
3100            low,
3101            high,
3102        } => Ok(Expr::Between {
3103            expr: Box::new(convert_expr(e)?),
3104            low: Box::new(convert_expr(low)?),
3105            high: Box::new(convert_expr(high)?),
3106            negated: *negated,
3107        }),
3108        sp::Expr::Like {
3109            expr: e,
3110            negated,
3111            pattern,
3112            escape_char,
3113            ..
3114        } => {
3115            let esc = escape_char
3116                .as_ref()
3117                .map(convert_escape_value)
3118                .transpose()?
3119                .map(Box::new);
3120            Ok(Expr::Like {
3121                expr: Box::new(convert_expr(e)?),
3122                pattern: Box::new(convert_expr(pattern)?),
3123                escape: esc,
3124                negated: *negated,
3125            })
3126        }
3127        sp::Expr::ILike {
3128            expr: e,
3129            negated,
3130            pattern,
3131            escape_char,
3132            ..
3133        } => {
3134            let esc = escape_char
3135                .as_ref()
3136                .map(convert_escape_value)
3137                .transpose()?
3138                .map(Box::new);
3139            Ok(Expr::Like {
3140                expr: Box::new(convert_expr(e)?),
3141                pattern: Box::new(convert_expr(pattern)?),
3142                escape: esc,
3143                negated: *negated,
3144            })
3145        }
3146        sp::Expr::Case {
3147            operand,
3148            conditions,
3149            else_result,
3150            ..
3151        } => {
3152            let op = operand
3153                .as_ref()
3154                .map(|e| convert_expr(e))
3155                .transpose()?
3156                .map(Box::new);
3157            let conds: Vec<(Expr, Expr)> = conditions
3158                .iter()
3159                .map(|cw| Ok((convert_expr(&cw.condition)?, convert_expr(&cw.result)?)))
3160                .collect::<Result<_>>()?;
3161            let else_r = else_result
3162                .as_ref()
3163                .map(|e| convert_expr(e))
3164                .transpose()?
3165                .map(Box::new);
3166            Ok(Expr::Case {
3167                operand: op,
3168                conditions: conds,
3169                else_result: else_r,
3170            })
3171        }
3172        sp::Expr::Cast {
3173            expr: e,
3174            data_type: dt,
3175            ..
3176        } => {
3177            if let (sp::DataType::Custom(name, modifiers), sp::Expr::Value(v)) = (dt, e.as_ref()) {
3178                if modifiers.is_empty() && name.0.len() == 1 && matches!(v.value, sp::Value::Null) {
3179                    if let sp::ObjectNamePart::Identifier(id) = &name.0[0] {
3180                        return Ok(Expr::TypedNullRecord(id.value.clone()));
3181                    }
3182                }
3183            }
3184            let target = convert_data_type(dt)?;
3185            let inner = convert_expr(e)?;
3186            if matches!(target, DataType::Json | DataType::Jsonb) {
3187                if let Expr::Literal(Value::Text(s)) = &inner {
3188                    let v = if matches!(target, DataType::Json) {
3189                        crate::json::validate_text(s.as_str())?;
3190                        Value::Json(s.clone())
3191                    } else {
3192                        crate::json::text_to_jsonb(s.as_str())?
3193                    };
3194                    return Ok(Expr::Literal(v));
3195                }
3196            }
3197            Ok(Expr::Cast {
3198                expr: Box::new(inner),
3199                data_type: target,
3200            })
3201        }
3202        sp::Expr::Collate {
3203            expr: e,
3204            collation: name,
3205        } => {
3206            let coll_name = object_name_to_string(name);
3207            let coll = crate::types::Collation::from_name(&coll_name).ok_or_else(|| {
3208                SqlError::Unsupported(format!(
3209                    "collation '{coll_name}' not supported (BINARY/NOCASE/RTRIM only)"
3210                ))
3211            })?;
3212            Ok(Expr::Collate {
3213                expr: Box::new(convert_expr(e)?),
3214                collation: coll,
3215            })
3216        }
3217        sp::Expr::Substring {
3218            expr: e,
3219            substring_from,
3220            substring_for,
3221            ..
3222        } => {
3223            let mut args = vec![convert_expr(e)?];
3224            if let Some(from) = substring_from {
3225                args.push(convert_expr(from)?);
3226            }
3227            if let Some(f) = substring_for {
3228                args.push(convert_expr(f)?);
3229            }
3230            Ok(Expr::Function {
3231                name: "SUBSTR".into(),
3232                args,
3233                distinct: false,
3234            })
3235        }
3236        sp::Expr::Trim {
3237            expr: e,
3238            trim_where,
3239            trim_what,
3240            trim_characters,
3241        } => {
3242            let fn_name = match trim_where {
3243                Some(sp::TrimWhereField::Leading) => "LTRIM",
3244                Some(sp::TrimWhereField::Trailing) => "RTRIM",
3245                _ => "TRIM",
3246            };
3247            let mut args = vec![convert_expr(e)?];
3248            if let Some(what) = trim_what {
3249                args.push(convert_expr(what)?);
3250            } else if let Some(chars) = trim_characters {
3251                if let Some(first) = chars.first() {
3252                    args.push(convert_expr(first)?);
3253                }
3254            }
3255            Ok(Expr::Function {
3256                name: fn_name.into(),
3257                args,
3258                distinct: false,
3259            })
3260        }
3261        sp::Expr::Ceil { expr: e, .. } => Ok(Expr::Function {
3262            name: "CEIL".into(),
3263            args: vec![convert_expr(e)?],
3264            distinct: false,
3265        }),
3266        sp::Expr::Floor { expr: e, .. } => Ok(Expr::Function {
3267            name: "FLOOR".into(),
3268            args: vec![convert_expr(e)?],
3269            distinct: false,
3270        }),
3271        sp::Expr::Position { expr: e, r#in } => Ok(Expr::Function {
3272            name: "INSTR".into(),
3273            args: vec![convert_expr(r#in)?, convert_expr(e)?],
3274            distinct: false,
3275        }),
3276        sp::Expr::TypedString(ts) => {
3277            let raw = match &ts.value.value {
3278                sp::Value::SingleQuotedString(s) => s.clone(),
3279                sp::Value::DoubleQuotedString(s) => s.clone(),
3280                other => other.to_string(),
3281            };
3282            convert_typed_string(&ts.data_type, &raw)
3283        }
3284        sp::Expr::Interval(iv) => convert_interval_expr(iv),
3285        sp::Expr::Extract { field, expr: e, .. } => {
3286            let field_name = match field {
3287                sp::DateTimeField::Year => "year",
3288                sp::DateTimeField::Month => "month",
3289                sp::DateTimeField::Week(_) => "week",
3290                sp::DateTimeField::Day => "day",
3291                sp::DateTimeField::Date => "day",
3292                sp::DateTimeField::Hour => "hour",
3293                sp::DateTimeField::Minute => "minute",
3294                sp::DateTimeField::Second => "second",
3295                sp::DateTimeField::Millisecond => "milliseconds",
3296                sp::DateTimeField::Microsecond => "microseconds",
3297                sp::DateTimeField::Microseconds => "microseconds",
3298                sp::DateTimeField::Milliseconds => "milliseconds",
3299                sp::DateTimeField::Dow => "dow",
3300                sp::DateTimeField::Isodow => "isodow",
3301                sp::DateTimeField::Doy => "doy",
3302                sp::DateTimeField::Epoch => "epoch",
3303                sp::DateTimeField::Quarter => "quarter",
3304                sp::DateTimeField::Decade => "decade",
3305                sp::DateTimeField::Century => "century",
3306                sp::DateTimeField::Millennium => "millennium",
3307                sp::DateTimeField::Isoyear => "isoyear",
3308                sp::DateTimeField::Julian => "julian",
3309                other => {
3310                    return Err(SqlError::InvalidExtractField(format!("{other:?}")));
3311                }
3312            };
3313            Ok(Expr::Function {
3314                name: "EXTRACT".into(),
3315                args: vec![
3316                    Expr::Literal(Value::Text(field_name.into())),
3317                    convert_expr(e)?,
3318                ],
3319                distinct: false,
3320            })
3321        }
3322        sp::Expr::AtTimeZone {
3323            timestamp,
3324            time_zone,
3325        } => Ok(Expr::Function {
3326            name: "AT_TIMEZONE".into(),
3327            args: vec![convert_expr(timestamp)?, convert_expr(time_zone)?],
3328            distinct: false,
3329        }),
3330        _ => Err(SqlError::Unsupported(format!("expression: {expr}"))),
3331    }
3332}
3333
3334fn convert_value(val: &sp::Value) -> Result<Expr> {
3335    match val {
3336        sp::Value::Number(n, _) => {
3337            if let Ok(i) = n.parse::<i64>() {
3338                Ok(Expr::Literal(Value::Integer(i)))
3339            } else if let Ok(f) = n.parse::<f64>() {
3340                Ok(Expr::Literal(Value::Real(f)))
3341            } else {
3342                Err(SqlError::InvalidValue(format!("cannot parse number: {n}")))
3343            }
3344        }
3345        sp::Value::SingleQuotedString(s) => Ok(Expr::Literal(Value::Text(s.as_str().into()))),
3346        sp::Value::Boolean(b) => Ok(Expr::Literal(Value::Boolean(*b))),
3347        sp::Value::Null => Ok(Expr::Literal(Value::Null)),
3348        sp::Value::Placeholder(s) => {
3349            let idx_str = s
3350                .strip_prefix('$')
3351                .ok_or_else(|| SqlError::Parse(format!("invalid placeholder: {s}")))?;
3352            let idx: usize = idx_str
3353                .parse()
3354                .map_err(|_| SqlError::Parse(format!("invalid placeholder index: {s}")))?;
3355            if idx == 0 {
3356                return Err(SqlError::Parse("placeholder index must be >= 1".into()));
3357            }
3358            Ok(Expr::Parameter(idx))
3359        }
3360        _ => Err(SqlError::Unsupported(format!("value type: {val}"))),
3361    }
3362}
3363
3364fn convert_typed_string(dt: &sp::DataType, value: &str) -> Result<Expr> {
3365    let s = value.trim_matches('\'');
3366    match dt {
3367        sp::DataType::Date => {
3368            let d = crate::datetime::parse_date(s)?;
3369            Ok(Expr::Literal(Value::Date(d)))
3370        }
3371        sp::DataType::Time(_, _) => {
3372            let t = crate::datetime::parse_time(s)?;
3373            Ok(Expr::Literal(Value::Time(t)))
3374        }
3375        sp::DataType::Timestamp(_, _) => {
3376            let t = crate::datetime::parse_timestamp(s)?;
3377            Ok(Expr::Literal(Value::Timestamp(t)))
3378        }
3379        sp::DataType::Interval { .. } => {
3380            let (months, days, micros) = crate::datetime::parse_interval(s)?;
3381            Ok(Expr::Literal(Value::Interval {
3382                months,
3383                days,
3384                micros,
3385            }))
3386        }
3387        _ => {
3388            let target = convert_data_type(dt)?;
3389            Ok(Expr::Cast {
3390                expr: Box::new(Expr::Literal(Value::Text(s.into()))),
3391                data_type: target,
3392            })
3393        }
3394    }
3395}
3396
3397fn convert_interval_expr(iv: &sp::Interval) -> Result<Expr> {
3398    let raw = match iv.value.as_ref() {
3399        sp::Expr::Value(v) => match &v.value {
3400            sp::Value::SingleQuotedString(s) => s.clone(),
3401            sp::Value::Number(n, _) => n.clone(),
3402            other => {
3403                return Err(SqlError::InvalidIntervalLiteral(format!(
3404                    "unsupported inner value: {other}"
3405                )))
3406            }
3407        },
3408        other => {
3409            return Err(SqlError::InvalidIntervalLiteral(format!(
3410                "unsupported inner expr: {other}"
3411            )))
3412        }
3413    };
3414
3415    let with_unit = if let Some(field) = &iv.leading_field {
3416        let unit_name = match field {
3417            sp::DateTimeField::Year => "years",
3418            sp::DateTimeField::Month => "months",
3419            sp::DateTimeField::Week(_) => "weeks",
3420            sp::DateTimeField::Day => "days",
3421            sp::DateTimeField::Hour => "hours",
3422            sp::DateTimeField::Minute => "minutes",
3423            sp::DateTimeField::Second => "seconds",
3424            _ => {
3425                return Err(SqlError::InvalidIntervalLiteral(format!(
3426                    "unsupported leading field: {field:?}"
3427                )))
3428            }
3429        };
3430        format!("{raw} {unit_name}")
3431    } else {
3432        raw
3433    };
3434
3435    let (months, days, micros) = crate::datetime::parse_interval(&with_unit)?;
3436    Ok(Expr::Literal(Value::Interval {
3437        months,
3438        days,
3439        micros,
3440    }))
3441}
3442
3443fn convert_escape_value(val: &sp::Value) -> Result<Expr> {
3444    match val {
3445        sp::Value::SingleQuotedString(s) => Ok(Expr::Literal(Value::Text(s.as_str().into()))),
3446        _ => Err(SqlError::Unsupported(format!("ESCAPE value: {val}"))),
3447    }
3448}
3449
3450fn convert_quantified(
3451    left: &sp::Expr,
3452    compare_op: &sp::BinaryOperator,
3453    right: &sp::Expr,
3454    quantifier: Quantifier,
3455) -> Result<Expr> {
3456    let left_expr = convert_expr(left)?;
3457    let op = convert_bin_op(compare_op)?;
3458    if !matches!(
3459        op,
3460        BinOp::Eq | BinOp::NotEq | BinOp::Lt | BinOp::Gt | BinOp::LtEq | BinOp::GtEq
3461    ) {
3462        return Err(SqlError::Unsupported(format!(
3463            "ANY/ALL only supports comparison operators, got {op:?}"
3464        )));
3465    }
3466    let rhs = match right {
3467        sp::Expr::Subquery(query) => {
3468            let stmt = convert_subquery(query)?;
3469            QuantifiedRhs::Subquery(Box::new(stmt))
3470        }
3471        other => QuantifiedRhs::Array(Box::new(convert_expr(other)?)),
3472    };
3473    Ok(Expr::Quantified {
3474        left: Box::new(left_expr),
3475        op,
3476        quantifier,
3477        right: rhs,
3478    })
3479}
3480
3481fn convert_bin_op(op: &sp::BinaryOperator) -> Result<BinOp> {
3482    match op {
3483        sp::BinaryOperator::Plus => Ok(BinOp::Add),
3484        sp::BinaryOperator::Minus => Ok(BinOp::Sub),
3485        sp::BinaryOperator::Multiply => Ok(BinOp::Mul),
3486        sp::BinaryOperator::Divide => Ok(BinOp::Div),
3487        sp::BinaryOperator::Modulo => Ok(BinOp::Mod),
3488        sp::BinaryOperator::Eq => Ok(BinOp::Eq),
3489        sp::BinaryOperator::NotEq => Ok(BinOp::NotEq),
3490        sp::BinaryOperator::Lt => Ok(BinOp::Lt),
3491        sp::BinaryOperator::Gt => Ok(BinOp::Gt),
3492        sp::BinaryOperator::LtEq => Ok(BinOp::LtEq),
3493        sp::BinaryOperator::GtEq => Ok(BinOp::GtEq),
3494        sp::BinaryOperator::And => Ok(BinOp::And),
3495        sp::BinaryOperator::Or => Ok(BinOp::Or),
3496        sp::BinaryOperator::StringConcat => Ok(BinOp::Concat),
3497        sp::BinaryOperator::Arrow => Ok(BinOp::JsonGet),
3498        sp::BinaryOperator::LongArrow => Ok(BinOp::JsonGetText),
3499        sp::BinaryOperator::HashArrow => Ok(BinOp::JsonPath),
3500        sp::BinaryOperator::HashLongArrow => Ok(BinOp::JsonPathText),
3501        sp::BinaryOperator::AtArrow => Ok(BinOp::JsonContains),
3502        sp::BinaryOperator::ArrowAt => Ok(BinOp::JsonContainedBy),
3503        sp::BinaryOperator::Question => Ok(BinOp::JsonHasKey),
3504        sp::BinaryOperator::QuestionPipe => Ok(BinOp::JsonHasAnyKey),
3505        sp::BinaryOperator::QuestionAnd => Ok(BinOp::JsonHasAllKeys),
3506        sp::BinaryOperator::HashMinus => Ok(BinOp::JsonDeletePath),
3507        sp::BinaryOperator::AtQuestion => Ok(BinOp::JsonPathExists),
3508        sp::BinaryOperator::AtAt => Ok(BinOp::JsonPathMatch),
3509        sp::BinaryOperator::Custom(s) if s == "@?_tz" => Ok(BinOp::JsonPathExistsTz),
3510        sp::BinaryOperator::Custom(s) if s == "@@_tz" => Ok(BinOp::JsonPathMatchTz),
3511        sp::BinaryOperator::LtDashGt => Ok(BinOp::VectorL2),
3512        sp::BinaryOperator::Spaceship => Ok(BinOp::VectorCosine),
3513        sp::BinaryOperator::Custom(s) if s == "<#>" => Ok(BinOp::VectorInner),
3514        _ => Err(SqlError::Unsupported(format!("binary op: {op}"))),
3515    }
3516}
3517
3518fn convert_function(func: &sp::Function) -> Result<Expr> {
3519    let name = object_name_to_string(&func.name).to_ascii_uppercase();
3520
3521    let (args, is_count_star, distinct) = match &func.args {
3522        sp::FunctionArguments::List(list) => {
3523            let distinct = matches!(
3524                list.duplicate_treatment,
3525                Some(sp::DuplicateTreatment::Distinct)
3526            );
3527            if list.args.is_empty() && name == "COUNT" {
3528                (vec![], true, distinct)
3529            } else {
3530                let mut count_star = false;
3531                let args = list
3532                    .args
3533                    .iter()
3534                    .map(|arg| match arg {
3535                        sp::FunctionArg::Unnamed(sp::FunctionArgExpr::Expr(e)) => convert_expr(e),
3536                        sp::FunctionArg::Unnamed(sp::FunctionArgExpr::Wildcard) => {
3537                            if name == "COUNT" {
3538                                count_star = true;
3539                                Ok(Expr::CountStar)
3540                            } else {
3541                                Err(SqlError::Unsupported(format!("{name}(*)")))
3542                            }
3543                        }
3544                        _ => Err(SqlError::Unsupported(format!(
3545                            "function arg type in {name}"
3546                        ))),
3547                    })
3548                    .collect::<Result<Vec<_>>>()?;
3549                if name == "COUNT" && args.len() == 1 && count_star {
3550                    (vec![], true, distinct)
3551                } else {
3552                    (args, false, distinct)
3553                }
3554            }
3555        }
3556        sp::FunctionArguments::None => {
3557            if name == "COUNT" {
3558                (vec![], true, false)
3559            } else {
3560                (vec![], false, false)
3561            }
3562        }
3563        sp::FunctionArguments::Subquery(_) => {
3564            return Err(SqlError::Unsupported("subquery in function".into()));
3565        }
3566    };
3567
3568    if let Some(over) = &func.over {
3569        let spec = match over {
3570            sp::WindowType::WindowSpec(ws) => convert_window_spec(ws)?,
3571            sp::WindowType::NamedWindow(_) => {
3572                return Err(SqlError::Unsupported("named windows".into()));
3573            }
3574        };
3575        return Ok(Expr::WindowFunction { name, args, spec });
3576    }
3577
3578    if is_count_star {
3579        return Ok(Expr::CountStar);
3580    }
3581
3582    if name == "COALESCE" {
3583        if args.is_empty() {
3584            return Err(SqlError::Parse(
3585                "COALESCE requires at least one argument".into(),
3586            ));
3587        }
3588        return Ok(Expr::Coalesce(args));
3589    }
3590
3591    if name == "NULLIF" {
3592        if args.len() != 2 {
3593            return Err(SqlError::Parse(
3594                "NULLIF requires exactly two arguments".into(),
3595            ));
3596        }
3597        return Ok(Expr::Case {
3598            operand: None,
3599            conditions: vec![(
3600                Expr::BinaryOp {
3601                    left: Box::new(args[0].clone()),
3602                    op: BinOp::Eq,
3603                    right: Box::new(args[1].clone()),
3604                },
3605                Expr::Literal(Value::Null),
3606            )],
3607            else_result: Some(Box::new(args[0].clone())),
3608        });
3609    }
3610
3611    if name == "IIF" {
3612        if args.len() != 3 {
3613            return Err(SqlError::Parse(
3614                "IIF requires exactly three arguments".into(),
3615            ));
3616        }
3617        return Ok(Expr::Case {
3618            operand: None,
3619            conditions: vec![(args[0].clone(), args[1].clone())],
3620            else_result: Some(Box::new(args[2].clone())),
3621        });
3622    }
3623
3624    Ok(Expr::Function {
3625        name,
3626        args,
3627        distinct,
3628    })
3629}
3630
3631fn convert_window_spec(ws: &sp::WindowSpec) -> Result<WindowSpec> {
3632    let partition_by = ws
3633        .partition_by
3634        .iter()
3635        .map(convert_expr)
3636        .collect::<Result<Vec<_>>>()?;
3637    let order_by = ws
3638        .order_by
3639        .iter()
3640        .map(convert_order_by_expr)
3641        .collect::<Result<Vec<_>>>()?;
3642    let frame = ws
3643        .window_frame
3644        .as_ref()
3645        .map(convert_window_frame)
3646        .transpose()?;
3647    Ok(WindowSpec {
3648        partition_by,
3649        order_by,
3650        frame,
3651    })
3652}
3653
3654fn convert_window_frame(wf: &sp::WindowFrame) -> Result<WindowFrame> {
3655    let units = match wf.units {
3656        sp::WindowFrameUnits::Rows => WindowFrameUnits::Rows,
3657        sp::WindowFrameUnits::Range => WindowFrameUnits::Range,
3658        sp::WindowFrameUnits::Groups => {
3659            return Err(SqlError::Unsupported("GROUPS window frame".into()));
3660        }
3661    };
3662    let start = convert_window_frame_bound(&wf.start_bound)?;
3663    let end = match &wf.end_bound {
3664        Some(b) => convert_window_frame_bound(b)?,
3665        None => WindowFrameBound::CurrentRow,
3666    };
3667    Ok(WindowFrame { units, start, end })
3668}
3669
3670fn convert_window_frame_bound(b: &sp::WindowFrameBound) -> Result<WindowFrameBound> {
3671    match b {
3672        sp::WindowFrameBound::CurrentRow => Ok(WindowFrameBound::CurrentRow),
3673        sp::WindowFrameBound::Preceding(None) => Ok(WindowFrameBound::UnboundedPreceding),
3674        sp::WindowFrameBound::Preceding(Some(e)) => {
3675            Ok(WindowFrameBound::Preceding(Box::new(convert_expr(e)?)))
3676        }
3677        sp::WindowFrameBound::Following(None) => Ok(WindowFrameBound::UnboundedFollowing),
3678        sp::WindowFrameBound::Following(Some(e)) => {
3679            Ok(WindowFrameBound::Following(Box::new(convert_expr(e)?)))
3680        }
3681    }
3682}
3683
3684fn convert_returning(items: Option<&[sp::SelectItem]>) -> Result<Option<Vec<SelectColumn>>> {
3685    match items {
3686        None => Ok(None),
3687        Some(items) => {
3688            let cols = items
3689                .iter()
3690                .map(convert_returning_item)
3691                .collect::<Result<Vec<_>>>()?;
3692            Ok(Some(cols))
3693        }
3694    }
3695}
3696
3697fn convert_returning_item(item: &sp::SelectItem) -> Result<SelectColumn> {
3698    match item {
3699        sp::SelectItem::Wildcard(_) => Ok(SelectColumn::AllColumns),
3700        sp::SelectItem::UnnamedExpr(e) => {
3701            reject_aggregate_or_window(e, "RETURNING")?;
3702            Ok(SelectColumn::Expr {
3703                expr: convert_expr(e)?,
3704                alias: None,
3705            })
3706        }
3707        sp::SelectItem::ExprWithAlias { expr, alias } => {
3708            reject_aggregate_or_window(expr, "RETURNING")?;
3709            Ok(SelectColumn::Expr {
3710                expr: convert_expr(expr)?,
3711                alias: Some(alias.value.clone()),
3712            })
3713        }
3714        sp::SelectItem::QualifiedWildcard(kind, _) => match kind {
3715            sp::SelectItemQualifiedWildcardKind::ObjectName(name) => {
3716                let s = object_name_to_string(name);
3717                if s.eq_ignore_ascii_case("old") {
3718                    Ok(SelectColumn::AllFromOld)
3719                } else if s.eq_ignore_ascii_case("new") {
3720                    Ok(SelectColumn::AllFromNew)
3721                } else {
3722                    Err(SqlError::Unsupported(format!(
3723                        "RETURNING {s}.* — only old.* and new.* qualified wildcards allowed"
3724                    )))
3725                }
3726            }
3727            sp::SelectItemQualifiedWildcardKind::Expr(_) => {
3728                Err(SqlError::Unsupported("expression.* in RETURNING".into()))
3729            }
3730        },
3731    }
3732}
3733
3734fn reject_aggregate_or_window(expr: &sp::Expr, ctx: &str) -> Result<()> {
3735    use sp::Expr as E;
3736    match expr {
3737        E::Function(f) => {
3738            if f.over.is_some() {
3739                return Err(SqlError::Unsupported(format!(
3740                    "window functions are not allowed in {ctx}"
3741                )));
3742            }
3743            let name = f
3744                .name
3745                .0
3746                .last()
3747                .map(|p| match p {
3748                    sp::ObjectNamePart::Identifier(id) => id.value.to_ascii_uppercase(),
3749                    _ => String::new(),
3750                })
3751                .unwrap_or_default();
3752            if matches!(
3753                name.as_str(),
3754                "COUNT"
3755                    | "SUM"
3756                    | "AVG"
3757                    | "MIN"
3758                    | "MAX"
3759                    | "GROUP_CONCAT"
3760                    | "STRING_AGG"
3761                    | "ARRAY_AGG"
3762                    | "BIT_AND"
3763                    | "BIT_OR"
3764                    | "BOOL_AND"
3765                    | "BOOL_OR"
3766                    | "EVERY"
3767                    | "STDDEV"
3768                    | "STDDEV_POP"
3769                    | "STDDEV_SAMP"
3770                    | "VARIANCE"
3771                    | "VAR_POP"
3772                    | "VAR_SAMP"
3773            ) {
3774                return Err(SqlError::Unsupported(format!(
3775                    "aggregate functions are not allowed in {ctx}"
3776                )));
3777            }
3778            for arg in walk_function_args(f) {
3779                reject_aggregate_or_window(arg, ctx)?;
3780            }
3781            Ok(())
3782        }
3783        E::BinaryOp { left, right, .. } => {
3784            reject_aggregate_or_window(left, ctx)?;
3785            reject_aggregate_or_window(right, ctx)
3786        }
3787        E::UnaryOp { expr, .. } => reject_aggregate_or_window(expr, ctx),
3788        E::Cast { expr, .. } => reject_aggregate_or_window(expr, ctx),
3789        E::Nested(e) => reject_aggregate_or_window(e, ctx),
3790        E::Case {
3791            conditions,
3792            else_result,
3793            ..
3794        } => {
3795            for cwt in conditions {
3796                reject_aggregate_or_window(&cwt.condition, ctx)?;
3797                reject_aggregate_or_window(&cwt.result, ctx)?;
3798            }
3799            if let Some(e) = else_result {
3800                reject_aggregate_or_window(e, ctx)?;
3801            }
3802            Ok(())
3803        }
3804        _ => Ok(()),
3805    }
3806}
3807
3808fn walk_function_args(f: &sp::Function) -> Vec<&sp::Expr> {
3809    use sp::FunctionArguments as FA;
3810    let mut out = Vec::new();
3811    if let FA::List(args) = &f.args {
3812        for a in &args.args {
3813            if let sp::FunctionArg::Unnamed(sp::FunctionArgExpr::Expr(e)) = a {
3814                out.push(e);
3815            }
3816        }
3817    }
3818    out
3819}
3820
3821fn convert_select_item(item: &sp::SelectItem) -> Result<SelectColumn> {
3822    match item {
3823        sp::SelectItem::Wildcard(_) => Ok(SelectColumn::AllColumns),
3824        sp::SelectItem::UnnamedExpr(e) => {
3825            let expr = convert_expr(e)?;
3826            Ok(SelectColumn::Expr { expr, alias: None })
3827        }
3828        sp::SelectItem::ExprWithAlias { expr, alias } => {
3829            let expr = convert_expr(expr)?;
3830            Ok(SelectColumn::Expr {
3831                expr,
3832                alias: Some(alias.value.clone()),
3833            })
3834        }
3835        sp::SelectItem::QualifiedWildcard(_, _) => {
3836            Err(SqlError::Unsupported("qualified wildcard (table.*)".into()))
3837        }
3838    }
3839}
3840
3841fn convert_order_by_expr(expr: &sp::OrderByExpr) -> Result<OrderByItem> {
3842    let e = convert_expr(&expr.expr)?;
3843    let descending = expr.options.asc.map(|asc| !asc).unwrap_or(false);
3844    let nulls_first = expr.options.nulls_first;
3845
3846    Ok(OrderByItem {
3847        expr: e,
3848        descending,
3849        nulls_first,
3850    })
3851}
3852
3853fn convert_data_type(dt: &sp::DataType) -> Result<DataType> {
3854    match dt {
3855        sp::DataType::Int(_)
3856        | sp::DataType::Integer(_)
3857        | sp::DataType::BigInt(_)
3858        | sp::DataType::SmallInt(_)
3859        | sp::DataType::TinyInt(_)
3860        | sp::DataType::Int2(_)
3861        | sp::DataType::Int4(_)
3862        | sp::DataType::Int8(_) => Ok(DataType::Integer),
3863
3864        sp::DataType::Real
3865        | sp::DataType::Double(..)
3866        | sp::DataType::DoublePrecision
3867        | sp::DataType::Float(_)
3868        | sp::DataType::Float4
3869        | sp::DataType::Float64 => Ok(DataType::Real),
3870
3871        sp::DataType::Varchar(_)
3872        | sp::DataType::Text
3873        | sp::DataType::Char(_)
3874        | sp::DataType::Character(_)
3875        | sp::DataType::String(_) => Ok(DataType::Text),
3876
3877        sp::DataType::Blob(_) | sp::DataType::Bytea => Ok(DataType::Blob),
3878
3879        sp::DataType::Boolean | sp::DataType::Bool => Ok(DataType::Boolean),
3880
3881        sp::DataType::Date => Ok(DataType::Date),
3882        sp::DataType::Time(_, _) => Ok(DataType::Time),
3883        sp::DataType::Timestamp(_, _) => Ok(DataType::Timestamp),
3884        sp::DataType::Interval { .. } => Ok(DataType::Interval),
3885
3886        sp::DataType::JSON => Ok(DataType::Json),
3887        sp::DataType::JSONB => Ok(DataType::Jsonb),
3888
3889        sp::DataType::TsVector => Ok(DataType::TsVector),
3890        sp::DataType::TsQuery => Ok(DataType::TsQuery),
3891
3892        sp::DataType::Custom(name, modifiers) => {
3893            if name.0.len() == 1 {
3894                if let sp::ObjectNamePart::Identifier(id) = &name.0[0] {
3895                    if id.value.eq_ignore_ascii_case("vector") {
3896                        if modifiers.len() != 1 {
3897                            return Err(SqlError::Parse(
3898                                "VECTOR requires exactly one dimension argument".into(),
3899                            ));
3900                        }
3901                        let dim: u16 = modifiers[0].parse().map_err(|_| {
3902                            SqlError::Parse(format!(
3903                                "VECTOR dimension must be a positive integer, got '{}'",
3904                                modifiers[0]
3905                            ))
3906                        })?;
3907                        if dim == 0 {
3908                            return Err(SqlError::Parse("VECTOR dimension must be >= 1".into()));
3909                        }
3910                        return Ok(DataType::Vector { dim });
3911                    }
3912                }
3913            }
3914            Err(SqlError::Unsupported(format!("data type: {dt}")))
3915        }
3916
3917        _ => Err(SqlError::Unsupported(format!("data type: {dt}"))),
3918    }
3919}
3920
3921fn object_name_to_string(name: &sp::ObjectName) -> String {
3922    name.0
3923        .iter()
3924        .filter_map(|p| match p {
3925            sp::ObjectNamePart::Identifier(ident) => Some(ident.value.clone()),
3926            _ => None,
3927        })
3928        .collect::<Vec<_>>()
3929        .join(".")
3930}
3931
3932#[cfg(test)]
3933#[path = "parser_tests.rs"]
3934mod tests;