Skip to main content

qusql_type/
schema.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! Parse and evaluate SQL schema definitions into a typed representation
14//! used for statement type-checking.
15//!
16//! Supports DDL statements (CREATE/ALTER/DROP for tables, views, functions,
17//! procedures, indices, and types) across MySQL/MariaDB, PostgreSQL/PostGIS,
18//! and SQLite dialects. Includes a limited schema-level evaluator that can
19//! interpret PL/pgSQL function bodies, DO blocks, IF/ELSE control flow,
20//! INSERT/DELETE/TRUNCATE for in-memory row tracking, and expressions
21//! (EXISTS, COALESCE, aggregates). PostgreSQL/PostGIS built-in schemas
22//! (e.g. `spatial_ref_sys`, `geometry_columns`) are injected automatically.
23//!
24//! ```
25//! use qusql_type::{schema::parse_schemas, TypeOptions, SQLDialect, Issues};
26//! let schemas = "
27//!     -- Table structure for table `events`
28//!     DROP TABLE IF EXISTS `events`;
29//!     CREATE TABLE `events` (
30//!       `id` bigint(20) NOT NULL,
31//!       `user` int(11) NOT NULL,
32//!       `event_key` int(11) NOT NULL,
33//!       `time` datetime NOT NULL
34//!     ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
35//!
36//!     -- Table structure for table `events_keys`
37//!     DROP TABLE IF EXISTS `event_keys`;
38//!     CREATE TABLE `event_keys` (
39//!       `id` int(11) NOT NULL,
40//!       `name` text NOT NULL
41//!     ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
42//!
43//!     -- Stand-in structure for view `events_view`
44//!     -- (See below for the actual view)
45//!     DROP VIEW IF EXISTS `events_view`;
46//!     CREATE TABLE `events_view` (
47//!         `id` int(11),
48//!         `user` int(11) NOT NULL,
49//!         `event_key` text NOT NULL,
50//!         `time` datetime NOT NULL
51//!     );
52//!
53//!     -- Indexes for table `events`
54//!     ALTER TABLE `events`
55//!       ADD PRIMARY KEY (`id`),
56//!       ADD KEY `time` (`time`),
57//!       ADD KEY `event_key` (`event_key`);
58//!
59//!     -- Indexes for table `event_keys`
60//!     ALTER TABLE `event_keys`
61//!       ADD PRIMARY KEY (`id`);
62//!
63//!     -- Constraints for table `events`
64//!     ALTER TABLE `events`
65//!       ADD CONSTRAINT `event_key` FOREIGN KEY (`event_key`) REFERENCES `event_keys` (`id`);
66//!
67//!     -- Structure for view `events_view`
68//!     DROP TABLE IF EXISTS `events_view`;
69//!     DROP VIEW IF EXISTS `events_view`;
70//!     CREATE ALGORITHM=UNDEFINED DEFINER=`phpmyadmin`@`localhost`
71//!         SQL SECURITY DEFINER VIEW `events_view` AS
72//!         SELECT
73//!             `events`.`id` AS `id`,
74//!             `events`.`user` AS `user`,
75//!             `event_keys`.`name` AS `event_key`,
76//!             `events`.`time` AS `time`
77//!         FROM `events`, `event_keys`
78//!         WHERE `events`.`event_key` = `event_keys`.`id`;
79//!     ";
80//!
81//! let mut issues = Issues::new(schemas);
82//! let schemas = parse_schemas(schemas,
83//!     &mut issues,
84//!     &TypeOptions::new().dialect(SQLDialect::MariaDB));
85//!
86//! assert!(issues.is_ok());
87//!
88//! for (name, schema) in schemas.schemas {
89//!     println!("{name}: {schema:?}")
90//! }
91//! ```
92
93use crate::{
94    Type, TypeOptions,
95    type_::{BaseType, FullType},
96    type_statement,
97    typer::unqualified_name,
98};
99use alloc::{borrow::Cow, boxed::Box, collections::BTreeMap, rc::Rc, sync::Arc, vec::Vec};
100use qusql_parse::{
101    AddColumn, AddIndex, AlterColumn, DataType, DataTypeProperty, DropColumn, Expression,
102    FunctionParam, Identifier, IdentifierPart, Issues, ModifyColumn, Span, Spanned, Statement,
103    parse_statements,
104};
105
106/// A column in a schema
107#[derive(Debug, Clone)]
108pub struct Column<'a> {
109    pub identifier: Identifier<'a>,
110    /// Type of the column
111    pub type_: FullType<'a>,
112    /// True if the column is auto_increment
113    pub auto_increment: bool,
114    pub default: bool,
115    pub as_: Option<Expression<'a>>,
116    pub generated: bool,
117}
118
119/// Schema representing a table or view
120#[derive(Debug)]
121pub struct Schema<'a> {
122    /// Span of identifier
123    pub identifier_span: Span,
124    /// List of columns
125    pub columns: Vec<Column<'a>>,
126    /// True if this is a view instead of a table
127    pub view: bool,
128}
129
130impl<'a> Schema<'a> {
131    pub fn get_column(&self, identifier: &str) -> Option<&Column<'a>> {
132        self.columns
133            .iter()
134            .find(|&column| column.identifier.value == identifier)
135    }
136    pub fn get_column_mut(&mut self, identifier: &str) -> Option<&mut Column<'a>> {
137        self.columns
138            .iter_mut()
139            .find(|column| column.identifier.value == identifier)
140    }
141}
142
143/// A stored procedure definition
144#[derive(Debug, Clone)]
145pub struct ProcedureDef<'a> {
146    pub name: Identifier<'a>,
147    pub params: Vec<FunctionParam<'a>>,
148    pub span: Span,
149    /// Statements extracted from the procedure body (if the body was a BEGIN...END block)
150    pub body: Option<Vec<Statement<'a>>>,
151}
152
153/// Parsed body of a stored function, with an offset for mapping spans
154/// back to the outer source file.
155#[derive(Debug)]
156pub struct FunctionDefBody<'a> {
157    /// Parsed statements from the function body
158    pub statements: Vec<Statement<'a>>,
159    /// The body source string (borrowed from the outer source)
160    pub src: &'a str,
161}
162
163/// A stored function definition
164#[derive(Debug)]
165pub struct FunctionDef<'a> {
166    pub name: Identifier<'a>,
167    pub params: Vec<FunctionParam<'a>>,
168    pub return_type: DataType<'a>,
169    pub span: Span,
170    /// Parsed body, present when the function was defined with a
171    /// dollar-quoted (non-escaped) AS body string.
172    pub body: Option<FunctionDefBody<'a>>,
173}
174
175#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
176pub struct IndexKey<'a> {
177    pub table: Option<Identifier<'a>>,
178    pub index: Identifier<'a>,
179}
180
181/// A user-defined type registered via `CREATE TYPE`
182#[derive(Debug)]
183pub enum TypeDef<'a> {
184    /// A PostgreSQL enum type
185    Enum {
186        values: Arc<Vec<Cow<'a, str>>>,
187        span: Span,
188    },
189}
190
191/// A description of tables, view, procedures and function in a schemas definition file
192#[derive(Debug, Default)]
193pub struct Schemas<'a> {
194    /// Map from name to Tables or views
195    pub schemas: BTreeMap<Identifier<'a>, Schema<'a>>,
196    /// Map from name to procedure
197    pub procedures: BTreeMap<Identifier<'a>, ProcedureDef<'a>>,
198    /// Map from name to function
199    pub functions: BTreeMap<Identifier<'a>, FunctionDef<'a>>,
200    /// Map from (table, index) to location
201    pub indices: BTreeMap<IndexKey<'a>, Span>,
202    /// Map from type name to type definition (e.g. enums created with `CREATE TYPE ... AS ENUM`)
203    pub types: BTreeMap<Identifier<'a>, TypeDef<'a>>,
204}
205
206/// Try to parse a borrowed string as SQL statements.
207/// Returns the parsed body if the string is a non-escaped borrow from `src`,
208/// or None if the string is escaped (Cow::Owned).
209fn try_parse_body<'a>(
210    src: &'a str,
211    body_str: &qusql_parse::SString<'a>,
212    issues: &mut Issues<'a>,
213    options: &qusql_parse::ParseOptions,
214) -> Option<FunctionDefBody<'a>> {
215    let Cow::Borrowed(borrowed) = &body_str.value else {
216        return None;
217    };
218    let span_offset = borrowed.as_ptr() as usize - src.as_ptr() as usize;
219    let body_options = options.clone().function_body(true).span_offset(span_offset);
220    let statements = parse_statements(borrowed, issues, &body_options);
221    Some(FunctionDefBody {
222        statements,
223        src: borrowed,
224    })
225}
226
227fn type_kind_from_parse<'a>(
228    type_: qusql_parse::Type<'a>,
229    unsigned: bool,
230    is_sqlite: bool,
231    types: Option<&BTreeMap<Identifier<'a>, TypeDef<'a>>>,
232    issues: &mut Issues<'a>,
233) -> Type<'a> {
234    match type_ {
235        qusql_parse::Type::TinyInt(v) => {
236            if !unsigned && matches!(v, Some((1, _))) {
237                BaseType::Bool.into()
238            } else if unsigned {
239                Type::U8
240            } else {
241                Type::I8
242            }
243        }
244        qusql_parse::Type::SmallInt(_) => {
245            if unsigned {
246                Type::U16
247            } else {
248                Type::I16
249            }
250        }
251        qusql_parse::Type::MediumInt(_) => {
252            if unsigned {
253                Type::U24
254            } else {
255                Type::I24
256            }
257        }
258        qusql_parse::Type::Int(_) => {
259            if unsigned {
260                Type::U32
261            } else {
262                Type::I32
263            }
264        }
265        qusql_parse::Type::BigInt(_) => {
266            if unsigned {
267                Type::U64
268            } else {
269                Type::I64
270            }
271        }
272        qusql_parse::Type::Char(_) => BaseType::String.into(),
273        qusql_parse::Type::VarChar(_) => BaseType::String.into(),
274        qusql_parse::Type::TinyText(_) => BaseType::String.into(),
275        qusql_parse::Type::MediumText(_) => BaseType::String.into(),
276        qusql_parse::Type::Text(_) => BaseType::String.into(),
277        qusql_parse::Type::LongText(_) => BaseType::String.into(),
278        qusql_parse::Type::Enum(e) => {
279            Type::Enum(Arc::new(e.into_iter().map(|s| s.value).collect()))
280        }
281        qusql_parse::Type::Set(s) => Type::Set(Arc::new(s.into_iter().map(|s| s.value).collect())),
282        qusql_parse::Type::Float(_) => Type::F32,
283        qusql_parse::Type::Double(_) => Type::F64,
284        qusql_parse::Type::DateTime(_) => BaseType::DateTime.into(),
285        qusql_parse::Type::Timestamp(_) => BaseType::TimeStamp.into(),
286        qusql_parse::Type::Time(_) => BaseType::Time.into(),
287        qusql_parse::Type::TinyBlob(_) => BaseType::Bytes.into(),
288        qusql_parse::Type::MediumBlob(_) => BaseType::Bytes.into(),
289        qusql_parse::Type::Date => BaseType::Date.into(),
290        qusql_parse::Type::Blob(_) => BaseType::Bytes.into(),
291        qusql_parse::Type::LongBlob(_) => BaseType::Bytes.into(),
292        qusql_parse::Type::VarBinary(_) => BaseType::Bytes.into(),
293        qusql_parse::Type::Binary(_) => BaseType::Bytes.into(),
294        qusql_parse::Type::Boolean => BaseType::Bool.into(),
295        qusql_parse::Type::Integer(_) => {
296            if is_sqlite {
297                BaseType::Integer.into()
298            } else {
299                Type::I32
300            }
301        }
302        qusql_parse::Type::Float8 => BaseType::Float.into(),
303        qusql_parse::Type::Numeric(ref v) => {
304            let span = v.as_ref().map(|(_, _, s)| s.clone()).unwrap_or(0..0);
305            issues.err("NUMERIC type is not yet supported", &span);
306            BaseType::Float.into()
307        }
308        qusql_parse::Type::Decimal(ref v) => {
309            let span = v.as_ref().map(|(_, _, s)| s.clone()).unwrap_or(0..0);
310            issues.err("DECIMAL type is not yet supported", &span);
311            BaseType::Float.into()
312        }
313        qusql_parse::Type::Timestamptz => BaseType::TimeStamp.into(),
314        qusql_parse::Type::Json => BaseType::String.into(),
315        qusql_parse::Type::Jsonb => BaseType::String.into(),
316        qusql_parse::Type::Bit(_, _) => BaseType::Bytes.into(),
317        qusql_parse::Type::VarBit(_) => BaseType::Bytes.into(),
318        qusql_parse::Type::Bytea => BaseType::Bytes.into(),
319        qusql_parse::Type::Named(qname) => {
320            // Look up user-defined types (e.g. enums created with CREATE TYPE ... AS ENUM).
321            // Only unqualified names are looked up; schema-qualified names (e.g. public.mytype)
322            // are not stored with a prefix in the types map.
323            if let Some(types) = types
324                && qname.prefix.is_empty()
325                && let Some(TypeDef::Enum { values, .. }) = types.get(qname.identifier.value)
326            {
327                Type::Enum(values.clone())
328            } else {
329                BaseType::String.into()
330            }
331        }
332        qusql_parse::Type::Inet4 => BaseType::String.into(),
333        qusql_parse::Type::Inet6 => BaseType::String.into(),
334        qusql_parse::Type::InetAddr => BaseType::String.into(),
335        qusql_parse::Type::Cidr => BaseType::String.into(),
336        qusql_parse::Type::Macaddr => BaseType::String.into(),
337        qusql_parse::Type::Macaddr8 => BaseType::String.into(),
338        qusql_parse::Type::Array(inner, _) => Type::Array(Box::new(type_kind_from_parse(
339            *inner, false, is_sqlite, types, issues,
340        ))),
341        qusql_parse::Type::Table(ref span, _) => {
342            issues.err("TABLE type is not yet supported", span);
343            BaseType::String.into()
344        }
345        qusql_parse::Type::Serial => Type::I32,
346        qusql_parse::Type::SmallSerial => Type::I16,
347        qusql_parse::Type::BigSerial => Type::I64,
348        qusql_parse::Type::Money => BaseType::Float.into(),
349        qusql_parse::Type::Timetz(_) => BaseType::Time.into(),
350        qusql_parse::Type::Interval(_) => BaseType::TimeInterval.into(),
351        qusql_parse::Type::TsQuery => BaseType::String.into(),
352        qusql_parse::Type::TsVector => BaseType::String.into(),
353        qusql_parse::Type::Uuid => BaseType::Uuid.into(),
354        qusql_parse::Type::Xml => BaseType::String.into(),
355        qusql_parse::Type::Range(sub) | qusql_parse::Type::MultiRange(sub) => {
356            use qusql_parse::RangeSubtype;
357            let elem = match sub {
358                RangeSubtype::Int4 => BaseType::Integer,
359                RangeSubtype::Int8 => BaseType::Integer,
360                RangeSubtype::Num => BaseType::Float,
361                RangeSubtype::Ts => BaseType::DateTime,
362                RangeSubtype::Tstz => BaseType::TimeStamp,
363                RangeSubtype::Date => BaseType::Date,
364            };
365            Type::Range(elem)
366        }
367        qusql_parse::Type::Point
368        | qusql_parse::Type::Line
369        | qusql_parse::Type::Lseg
370        | qusql_parse::Type::Box
371        | qusql_parse::Type::Path
372        | qusql_parse::Type::Polygon
373        | qusql_parse::Type::Circle => Type::Geometry,
374    }
375}
376
377pub(crate) fn parse_column<'a>(
378    data_type: DataType<'a>,
379    identifier: Identifier<'a>,
380    _issues: &mut Issues<'a>,
381    options: Option<&TypeOptions>,
382    types: Option<&BTreeMap<Identifier<'a>, TypeDef<'a>>>,
383) -> Column<'a> {
384    let mut not_null = false;
385    let mut unsigned = false;
386    let mut auto_increment = false;
387    let mut default = false;
388    let mut as_ = None;
389    let mut generated = false;
390    let mut primary_key = false;
391    let is_sqlite = options
392        .map(|v| v.parse_options.get_dialect().is_sqlite())
393        .unwrap_or_default();
394    for p in data_type.properties {
395        match p {
396            DataTypeProperty::Signed(_) => unsigned = false,
397            DataTypeProperty::Unsigned(_) => unsigned = true,
398            DataTypeProperty::Null(_) => not_null = false,
399            DataTypeProperty::NotNull(_) => not_null = true,
400            DataTypeProperty::AutoIncrement(_) => auto_increment = true,
401            DataTypeProperty::As((_, e)) => as_ = Some(e),
402            DataTypeProperty::Default(_) => default = true,
403            DataTypeProperty::GeneratedAlways(_) => generated = true,
404            DataTypeProperty::GeneratedAlwaysAsExpr { .. } => generated = true,
405            DataTypeProperty::PrimaryKey(_) => primary_key = true,
406            _ => {}
407        }
408    }
409    // SQLite INTEGER PRIMARY KEY is an alias for rowid (auto-increment)
410    if is_sqlite && primary_key && matches!(data_type.type_, qusql_parse::Type::Integer(_)) {
411        auto_increment = true;
412    }
413    // PRIMARY KEY implies NOT NULL
414    if primary_key {
415        not_null = true;
416    }
417    let type_ = type_kind_from_parse(data_type.type_, unsigned, is_sqlite, types, _issues);
418    Column {
419        identifier,
420        type_: FullType {
421            t: type_,
422            not_null,
423            list_hack: false,
424        },
425        auto_increment,
426        as_,
427        default,
428        generated,
429    }
430}
431
432/// A runtime SQL value produced during schema evaluation.
433#[derive(Clone, Debug, PartialEq)]
434enum SqlValue<'a> {
435    Null,
436    Bool(bool),
437    Integer(i64),
438    /// A text slice directly from the SQL source - span arithmetic still works.
439    SourceText(&'a str),
440    /// A computed / owned text value.
441    OwnedText(alloc::string::String),
442}
443
444impl<'a> SqlValue<'a> {
445    fn as_source_text(&self) -> Option<&'a str> {
446        if let SqlValue::SourceText(s) = self {
447            Some(s)
448        } else {
449            None
450        }
451    }
452
453    fn is_truthy(&self) -> bool {
454        match self {
455            SqlValue::Bool(b) => *b,
456            SqlValue::Integer(i) => *i != 0,
457            SqlValue::Null => false,
458            SqlValue::SourceText(_) | SqlValue::OwnedText(_) => true,
459        }
460    }
461
462    fn sql_eq(&self, other: &SqlValue<'a>) -> Option<bool> {
463        match (self, other) {
464            (SqlValue::Null, _) | (_, SqlValue::Null) => None,
465            (SqlValue::Bool(a), SqlValue::Bool(b)) => Some(a == b),
466            (SqlValue::Integer(a), SqlValue::Integer(b)) => Some(a == b),
467            (SqlValue::SourceText(a), SqlValue::SourceText(b)) => Some(a == b),
468            (SqlValue::SourceText(a), SqlValue::OwnedText(b)) => Some(*a == b.as_str()),
469            (SqlValue::OwnedText(a), SqlValue::SourceText(b)) => Some(a.as_str() == *b),
470            (SqlValue::OwnedText(a), SqlValue::OwnedText(b)) => Some(a == b),
471            _ => None,
472        }
473    }
474
475    fn sql_lte(&self, other: &SqlValue<'a>) -> Option<bool> {
476        match (self, other) {
477            (SqlValue::Null, _) | (_, SqlValue::Null) => None,
478            (SqlValue::Integer(a), SqlValue::Integer(b)) => Some(a <= b),
479            _ => None,
480        }
481    }
482}
483
484/// A single row of evaluated column values.
485type Row<'a> = Rc<Vec<(&'a str, SqlValue<'a>)>>;
486
487/// Processing context for schema evaluation: holds mutable schema state, issue
488/// sink, the source text for span-offset calculations, and parse/type options.
489struct SchemaCtx<'a, 'b> {
490    schemas: &'b mut Schemas<'a>,
491    issues: &'b mut Issues<'a>,
492    /// The source text slice that all spans inside `issues` refer to.
493    src: &'a str,
494    options: &'b TypeOptions,
495    /// Active function argument bindings: parameter name -> SQL value.
496    /// Set when evaluating a known function body.
497    bindings: BTreeMap<&'a str, SqlValue<'a>>,
498    /// In-memory row store for tables populated during schema evaluation.
499    rows: BTreeMap<&'a str, Vec<Row<'a>>>,
500    /// Table rows made available to aggregate functions during eval_condition.
501    /// Temporarily swapped via core::mem::take so eval functions can take &mut self.
502    current_table_rows: Vec<Row<'a>>,
503    /// The row currently being evaluated (e.g. during WHERE clause filtering).
504    /// Set by eval_select_matching_rows around each row's eval call.
505    current_row: Option<Row<'a>>,
506    /// The return value of the most recently executed RETURN statement.
507    /// Set by the Return arm in process_statement; consumed by eval_function_expr.
508    return_value: Option<SqlValue<'a>>,
509}
510
511impl<'a, 'b> SchemaCtx<'a, 'b> {
512    fn new(
513        schemas: &'b mut Schemas<'a>,
514        issues: &'b mut Issues<'a>,
515        src: &'a str,
516        options: &'b TypeOptions,
517    ) -> Self {
518        Self {
519            schemas,
520            issues,
521            src,
522            options,
523            bindings: Default::default(),
524            rows: Default::default(),
525            current_table_rows: Default::default(),
526            current_row: Default::default(),
527            return_value: None,
528        }
529    }
530
531    /// Process a list of top-level schema statements.  Each statement is
532    /// independent: errors in one do not stop processing of the next.
533    fn process_top_level_statements(&mut self, statements: Vec<qusql_parse::Statement<'a>>) {
534        for statement in statements {
535            let _ = self.process_statement(statement);
536        }
537    }
538
539    /// Process a list of statements in a block or function body, stopping at
540    /// the first `Err` (error or `RETURN`).
541    fn process_statements(
542        &mut self,
543        statements: Vec<qusql_parse::Statement<'a>>,
544    ) -> Result<(), ()> {
545        for statement in statements {
546            self.process_statement(statement)?;
547        }
548        Ok(())
549    }
550
551    fn process_statement(&mut self, statement: qusql_parse::Statement<'a>) -> Result<(), ()> {
552        match statement {
553            qusql_parse::Statement::CreateTable(t) => {
554                self.process_create_table(*t);
555                Ok(())
556            }
557            qusql_parse::Statement::CreateView(v) => {
558                self.process_create_view(*v);
559                Ok(())
560            }
561            qusql_parse::Statement::CreateFunction(f) => {
562                self.process_create_function(*f);
563                Ok(())
564            }
565            qusql_parse::Statement::CreateProcedure(p) => {
566                self.process_create_procedure(*p);
567                Ok(())
568            }
569            qusql_parse::Statement::CreateIndex(ci) => {
570                self.process_create_index(*ci);
571                Ok(())
572            }
573            qusql_parse::Statement::CreateTrigger(_) => Ok(()),
574            qusql_parse::Statement::CreateTypeEnum(s) => {
575                self.process_create_type_enum(*s);
576                Ok(())
577            }
578            qusql_parse::Statement::AlterTable(a) => {
579                self.process_alter_table(*a);
580                Ok(())
581            }
582            qusql_parse::Statement::DropTable(t) => {
583                self.process_drop_table(*t);
584                Ok(())
585            }
586            qusql_parse::Statement::DropView(v) => {
587                self.process_drop_view(*v);
588                Ok(())
589            }
590            qusql_parse::Statement::DropFunction(f) => {
591                self.process_drop_function(*f);
592                Ok(())
593            }
594            qusql_parse::Statement::DropProcedure(p) => {
595                self.process_drop_procedure(*p);
596                Ok(())
597            }
598            qusql_parse::Statement::DropIndex(ci) => {
599                self.process_drop_index(*ci);
600                Ok(())
601            }
602            qusql_parse::Statement::DropDatabase(s) => {
603                self.issues.err("not implemented", &s);
604                Err(())
605            }
606            qusql_parse::Statement::DropServer(s) => {
607                self.issues.err("not implemented", &s);
608                Err(())
609            }
610            qusql_parse::Statement::DropTrigger(_) => Ok(()),
611            qusql_parse::Statement::DropType(s) => {
612                self.process_drop_type(*s);
613                Ok(())
614            }
615            // Control-flow: recurse into all reachable branches.
616            qusql_parse::Statement::Do(d) => self.process_do(*d),
617            qusql_parse::Statement::Block(b) => self.process_statements(b.statements),
618            qusql_parse::Statement::If(i) => self.process_if(*i),
619            // SELECT: may call a known function whose body we can evaluate.
620            qusql_parse::Statement::Select(s) => self.process_select(*s),
621            // DML: track row insertions so conditions like EXISTS(...) can be evaluated.
622            qusql_parse::Statement::InsertReplace(i) => self.process_insert(*i),
623            // Transaction control: we assume all transactions commit.
624            qusql_parse::Statement::Commit(_) => Ok(()),
625            qusql_parse::Statement::Begin(_) => Ok(()),
626            // Statements with no schema effect.
627            qusql_parse::Statement::Grant(_) => Ok(()),
628            qusql_parse::Statement::CommentOn(_) => Ok(()),
629            qusql_parse::Statement::Analyze(_) => Ok(()),
630            // Variable / cursor plumbing — no schema effect.
631            qusql_parse::Statement::Set(_) => Ok(()),
632            // Assign and Perform may call known functions with schema effects.
633            qusql_parse::Statement::Assign(a) => {
634                for se in a.value.select_exprs {
635                    self.process_expression(se.expr)?;
636                }
637                Ok(())
638            }
639            qusql_parse::Statement::Perform(p) => self.process_expression(p.expr),
640            qusql_parse::Statement::DeclareVariable(d) => {
641                // Evaluate the DEFAULT expression for its side effects (may call user-defined functions).
642                if let Some((_, select)) = d.default {
643                    self.eval_condition(&select)?;
644                }
645                Ok(())
646            }
647            // DeclareHandler bodies only run on error; we model the happy path, so skip them.
648            qusql_parse::Statement::DeclareHandler(_) => Ok(()),
649            qusql_parse::Statement::ExecuteFunction(s) => {
650                self.issues.err("not implemented", &s);
651                Err(())
652            }
653            // RAISE EXCEPTION aborts execution; anything else is a log/notice with no schema effect.
654            qusql_parse::Statement::Raise(r) => {
655                if matches!(r.level, Some(qusql_parse::RaiseLevel::Exception(_))) {
656                    Err(())
657                } else {
658                    Ok(())
659                }
660            }
661            qusql_parse::Statement::Return(r) => {
662                self.return_value = self.eval_expr(&r.expr).ok();
663                Err(())
664            }
665            qusql_parse::Statement::PlpgsqlExecute(e) => self.process_plpgsql_execute(*e),
666            qusql_parse::Statement::Update(u) => self.process_update(*u),
667            qusql_parse::Statement::Delete(d) => self.process_delete(*d),
668            qusql_parse::Statement::AlterType(a) => {
669                self.process_alter_type(*a);
670                Ok(())
671            }
672            qusql_parse::Statement::TruncateTable(t) => {
673                self.process_truncate_table(*t);
674                Ok(())
675            }
676            qusql_parse::Statement::RenameTable(r) => {
677                self.process_rename_table(*r);
678                Ok(())
679            }
680            qusql_parse::Statement::Call(c) => self.process_call(*c),
681            s => {
682                self.issues.err(
683                    alloc::format!("Unsupported statement {s:?} in schema definition"),
684                    &s,
685                );
686                Err(())
687            }
688        }
689    }
690
691    fn process_create_table(&mut self, t: qusql_parse::CreateTable<'a>) {
692        let mut replace = false;
693        let id = unqualified_name(self.issues, &t.identifier);
694        let mut schema = Schema {
695            view: false,
696            identifier_span: id.span.clone(),
697            columns: Default::default(),
698        };
699        for o in t.create_options {
700            match o {
701                qusql_parse::CreateOption::OrReplace(_) => replace = true,
702                qusql_parse::CreateOption::Temporary { temporary_span, .. } => {
703                    self.issues.err("Not supported", &temporary_span);
704                }
705                qusql_parse::CreateOption::Materialized(s) => {
706                    self.issues.err("Not supported", &s);
707                }
708                qusql_parse::CreateOption::Concurrently(s) => {
709                    self.issues.err("Not supported", &s);
710                }
711                qusql_parse::CreateOption::Unique(s) => {
712                    self.issues.err("Not supported", &s);
713                }
714                _ => {}
715            }
716        }
717        for d in t.create_definitions {
718            match d {
719                qusql_parse::CreateDefinition::ColumnDefinition {
720                    identifier,
721                    data_type,
722                } => {
723                    let column = parse_column(
724                        data_type,
725                        identifier.clone(),
726                        self.issues,
727                        Some(self.options),
728                        Some(&self.schemas.types),
729                    );
730                    if let Some(oc) = schema.get_column(column.identifier.value) {
731                        self.issues
732                            .err("Column already defined", &identifier)
733                            .frag("Defined here", &oc.identifier);
734                    } else {
735                        schema.columns.push(column);
736                    }
737                }
738                qusql_parse::CreateDefinition::IndexDefinition {
739                    index_type,
740                    index_name,
741                    cols,
742                    ..
743                } => {
744                    // Validate that every column referenced by the index exists.
745                    for col in &cols {
746                        if let qusql_parse::IndexColExpr::Column(cname) = &col.expr
747                            && schema.get_column(cname.value).is_none()
748                        {
749                            self.issues
750                                .err("No such column in table", col)
751                                .frag("Table defined here", &schema.identifier_span);
752                        }
753                    }
754                    // PRIMARY KEY implies NOT NULL on each listed column.
755                    if matches!(index_type, qusql_parse::IndexType::Primary(_)) {
756                        for col in &cols {
757                            if let qusql_parse::IndexColExpr::Column(cname) = &col.expr
758                                && let Some(c) = schema.get_column_mut(cname.value)
759                            {
760                                c.type_.not_null = true;
761                            }
762                        }
763                    }
764                    // Register named indices.
765                    if let Some(name) = index_name {
766                        let ident = if self.options.parse_options.get_dialect().is_postgresql() {
767                            IndexKey {
768                                table: None,
769                                index: name.clone(),
770                            }
771                        } else {
772                            IndexKey {
773                                table: Some(id.clone()),
774                                index: name.clone(),
775                            }
776                        };
777                        let span = name.span();
778                        if let Some(old) = self.schemas.indices.insert(ident, span) {
779                            self.issues
780                                .err("Multiple indices with the same identifier", &name)
781                                .frag("Already defined here", &old);
782                        }
783                    }
784                }
785                qusql_parse::CreateDefinition::ForeignKeyDefinition { .. } => {}
786                qusql_parse::CreateDefinition::CheckConstraintDefinition { .. } => {}
787                qusql_parse::CreateDefinition::LikeTable { source_table, .. } => {
788                    let source_id = unqualified_name(self.issues, &source_table);
789                    if let Some(src) = self.schemas.schemas.get(source_id) {
790                        let cols: Vec<Column<'a>> = src.columns.to_vec();
791                        for col in cols {
792                            if schema.get_column(col.identifier.value).is_none() {
793                                schema.columns.push(col);
794                            }
795                        }
796                    } else {
797                        self.issues.err("Table not found", &source_table);
798                    }
799                }
800            }
801        }
802        match self.schemas.schemas.entry(id.clone()) {
803            alloc::collections::btree_map::Entry::Occupied(mut e) => {
804                if replace {
805                    e.insert(schema);
806                } else if t.if_not_exists.is_none() {
807                    self.issues
808                        .err("Table already defined", &t.identifier)
809                        .frag("Defined here", &e.get().identifier_span);
810                }
811            }
812            alloc::collections::btree_map::Entry::Vacant(e) => {
813                e.insert(schema);
814            }
815        }
816    }
817
818    fn process_create_view(&mut self, v: qusql_parse::CreateView<'a>) {
819        let mut replace = false;
820        let mut schema = Schema {
821            view: true,
822            identifier_span: v.name.span(),
823            columns: Default::default(),
824        };
825        for o in v.create_options {
826            match o {
827                qusql_parse::CreateOption::OrReplace(_) => replace = true,
828                qusql_parse::CreateOption::Temporary { temporary_span, .. } => {
829                    self.issues.err("Not supported", &temporary_span);
830                }
831                qusql_parse::CreateOption::Materialized(s) => {
832                    self.issues.err("Not supported", &s);
833                }
834                qusql_parse::CreateOption::Concurrently(s) => {
835                    self.issues.err("Not supported", &s);
836                }
837                qusql_parse::CreateOption::Unique(s) => {
838                    self.issues.err("Not supported", &s);
839                }
840                _ => {}
841            }
842        }
843        {
844            let mut typer: crate::typer::Typer<'a, '_> = crate::typer::Typer {
845                schemas: self.schemas,
846                issues: self.issues,
847                reference_types: Vec::new(),
848                outer_reference_types: Vec::new(),
849                arg_types: Default::default(),
850                options: self.options,
851                with_schemas: Default::default(),
852            };
853            let t = type_statement::type_statement(&mut typer, &v.select);
854            let s = if let type_statement::InnerStatementType::Select(s) = t {
855                s
856            } else {
857                self.issues.err("Not supported", &v.select.span());
858                return;
859            };
860            for column in s.columns {
861                let Some(name) = column.name else {
862                    self.issues.err(
863                        "View column has no name; add an alias with AS",
864                        &v.select.span(),
865                    );
866                    continue;
867                };
868                schema.columns.push(Column {
869                    identifier: name,
870                    type_: column.type_,
871                    auto_increment: false,
872                    default: false,
873                    as_: None,
874                    generated: false,
875                });
876            }
877        }
878        match self
879            .schemas
880            .schemas
881            .entry(unqualified_name(self.issues, &v.name).clone())
882        {
883            alloc::collections::btree_map::Entry::Occupied(mut e) => {
884                if replace {
885                    e.insert(schema);
886                } else if v.if_not_exists.is_none() {
887                    self.issues
888                        .err("View already defined", &v.name)
889                        .frag("Defined here", &e.get().identifier_span);
890                }
891            }
892            alloc::collections::btree_map::Entry::Vacant(e) => {
893                e.insert(schema);
894            }
895        }
896    }
897
898    fn process_create_function(&mut self, f: qusql_parse::CreateFunction<'a>) {
899        let mut replace = false;
900        for o in &f.create_options {
901            if matches!(o, qusql_parse::CreateOption::OrReplace(_)) {
902                replace = true;
903            }
904        }
905        let body = f
906            .body
907            .as_ref()
908            .and_then(|b| b.strings.first())
909            .and_then(|s| try_parse_body(self.src, s, self.issues, &self.options.parse_options));
910        let name = f.name.clone();
911        let def = FunctionDef {
912            name: f.name.clone(),
913            params: f.params,
914            return_type: f.return_type,
915            span: f.create_span.join_span(&f.function_span),
916            body,
917        };
918        match self.schemas.functions.entry(name) {
919            alloc::collections::btree_map::Entry::Occupied(mut e) => {
920                if replace {
921                    e.insert(def);
922                } else if f.if_not_exists.is_none() {
923                    self.issues
924                        .err("Function already defined", &f.name)
925                        .frag("Defined here", &e.get().span);
926                }
927            }
928            alloc::collections::btree_map::Entry::Vacant(e) => {
929                e.insert(def);
930            }
931        }
932    }
933
934    fn process_create_procedure(&mut self, p: qusql_parse::CreateProcedure<'a>) {
935        let mut replace = false;
936        for o in &p.create_options {
937            if matches!(o, qusql_parse::CreateOption::OrReplace(_)) {
938                replace = true;
939            }
940        }
941        let name = p.name.clone();
942        let body = p.body.map(|stmt| match stmt {
943            qusql_parse::Statement::Block(b) => b.statements,
944            other => alloc::vec![other],
945        });
946        let def = ProcedureDef {
947            name: p.name.clone(),
948            params: p.params,
949            span: p.create_span.join_span(&p.procedure_span),
950            body,
951        };
952        match self.schemas.procedures.entry(name) {
953            alloc::collections::btree_map::Entry::Occupied(mut e) => {
954                if replace {
955                    e.insert(def);
956                } else if p.if_not_exists.is_none() {
957                    self.issues
958                        .err("Procedure already defined", &p.name)
959                        .frag("Defined here", &e.get().span);
960                }
961            }
962            alloc::collections::btree_map::Entry::Vacant(e) => {
963                e.insert(def);
964            }
965        }
966    }
967
968    fn process_call(&mut self, c: qusql_parse::Call<'a>) -> Result<(), ()> {
969        let proc_name = c.name.identifier.value;
970        // Look up the procedure and clone its body statements.
971        let body = self
972            .schemas
973            .procedures
974            .values()
975            .find(|p| p.name.value == proc_name)
976            .and_then(|p| p.body.clone());
977        let Some(statements) = body else {
978            // Unknown or body-less procedure — no schema effect.
979            return Ok(());
980        };
981        // Ignore Ok/Err from the body: a Err() just means "stopped early" (RAISE EXCEPTION, RETURN, etc.)
982        let _ = self.process_statements(statements);
983        Ok(())
984    }
985
986    fn process_create_index(&mut self, ci: qusql_parse::CreateIndex<'a>) {
987        let t = unqualified_name(self.issues, &ci.table_name);
988        if let Some(table) = self.schemas.schemas.get(t) {
989            for col in &ci.column_names {
990                if let qusql_parse::IndexColExpr::Column(name) = &col.expr
991                    && table.get_column(name.value).is_none()
992                {
993                    self.issues
994                        .err("No such column in table", col)
995                        .frag("Table defined here", &table.identifier_span);
996                }
997            }
998        } else {
999            self.issues.err("No such table", &ci.table_name);
1000        }
1001        let index_name = match &ci.index_name {
1002            Some(name) => name.clone(),
1003            None => return,
1004        };
1005        let ident = if self.options.parse_options.get_dialect().is_postgresql() {
1006            IndexKey {
1007                table: None,
1008                index: index_name.clone(),
1009            }
1010        } else {
1011            IndexKey {
1012                table: Some(t.clone()),
1013                index: index_name.clone(),
1014            }
1015        };
1016        if let Some(old) = self.schemas.indices.insert(ident, ci.span())
1017            && ci.if_not_exists.is_none()
1018        {
1019            self.issues
1020                .err("Multiple indices with the same identifier", &ci)
1021                .frag("Already defined here", &old);
1022        }
1023    }
1024
1025    fn process_create_type_enum(&mut self, s: qusql_parse::CreateTypeEnum<'a>) {
1026        let name = unqualified_name(self.issues, &s.name);
1027        let mut replace = false;
1028        for o in &s.create_options {
1029            if matches!(o, qusql_parse::CreateOption::OrReplace(_)) {
1030                replace = true;
1031            }
1032        }
1033        let values = Arc::new(s.values.into_iter().map(|v| v.value).collect());
1034        let typedef = TypeDef::Enum {
1035            values,
1036            span: s.as_enum_span,
1037        };
1038        match self.schemas.types.entry(name.clone()) {
1039            alloc::collections::btree_map::Entry::Occupied(mut e) => {
1040                if replace {
1041                    e.insert(typedef);
1042                }
1043                // Otherwise silently skip - SQL uses EXCEPTION WHEN duplicate_object to handle re-runs
1044            }
1045            alloc::collections::btree_map::Entry::Vacant(e) => {
1046                e.insert(typedef);
1047            }
1048        }
1049    }
1050
1051    fn process_drop_type(&mut self, s: qusql_parse::DropType<'a>) {
1052        let if_exists = s.if_exists;
1053        for name in s.names {
1054            let id = unqualified_name(self.issues, &name);
1055            if self.schemas.types.remove(id).is_none() && if_exists.is_none() {
1056                self.issues.err("Type not found", &name);
1057            }
1058        }
1059    }
1060
1061    fn process_alter_table(&mut self, a: qusql_parse::AlterTable<'a>) {
1062        let e = match self
1063            .schemas
1064            .schemas
1065            .entry(unqualified_name(self.issues, &a.table).clone())
1066        {
1067            alloc::collections::btree_map::Entry::Occupied(e) => {
1068                let e = e.into_mut();
1069                if e.view {
1070                    self.issues.err("Cannot alter view", &a.table);
1071                    return;
1072                }
1073                e
1074            }
1075            alloc::collections::btree_map::Entry::Vacant(_) => {
1076                if a.if_exists.is_none() {
1077                    self.issues.err("Table not found", &a.table);
1078                }
1079                return;
1080            }
1081        };
1082        for s in a.alter_specifications {
1083            process_alter_specification(
1084                s,
1085                e,
1086                &a.table,
1087                self.issues,
1088                &mut self.schemas.indices,
1089                self.options,
1090                &self.schemas.types,
1091            );
1092        }
1093    }
1094}
1095
1096#[allow(clippy::too_many_lines)]
1097fn process_alter_specification<'a>(
1098    s: qusql_parse::AlterSpecification<'a>,
1099    e: &mut Schema<'a>,
1100    table_ref: &qusql_parse::QualifiedName<'a>,
1101    issues: &mut Issues<'a>,
1102    indices: &mut alloc::collections::BTreeMap<IndexKey<'a>, Span>,
1103    options: &TypeOptions,
1104    types: &BTreeMap<Identifier<'a>, TypeDef<'a>>,
1105) {
1106    match s {
1107        qusql_parse::AlterSpecification::AddIndex(AddIndex {
1108            if_not_exists,
1109            name,
1110            cols,
1111            index_type,
1112            ..
1113        }) => {
1114            for col in &cols {
1115                if let qusql_parse::IndexColExpr::Column(cname) = &col.expr
1116                    && e.get_column(cname.value).is_none()
1117                {
1118                    issues
1119                        .err("No such column in table", col)
1120                        .frag("Table defined here", table_ref);
1121                }
1122            }
1123            // PRIMARY KEY implies NOT NULL on each listed column.
1124            if matches!(index_type, qusql_parse::IndexType::Primary(_)) {
1125                for col in &cols {
1126                    if let qusql_parse::IndexColExpr::Column(cname) = &col.expr
1127                        && let Some(c) = e.get_column_mut(cname.value)
1128                    {
1129                        c.type_.not_null = true;
1130                    }
1131                }
1132            }
1133            if let Some(name) = &name {
1134                let ident = if options.parse_options.get_dialect().is_postgresql() {
1135                    IndexKey {
1136                        table: None,
1137                        index: name.clone(),
1138                    }
1139                } else {
1140                    IndexKey {
1141                        table: Some(unqualified_name(issues, table_ref).clone()),
1142                        index: name.clone(),
1143                    }
1144                };
1145                if let Some(old) = indices.insert(ident, name.span())
1146                    && if_not_exists.is_none()
1147                {
1148                    issues
1149                        .err("Multiple indices with the same identifier", &name.span())
1150                        .frag("Already defined here", &old);
1151                }
1152            }
1153        }
1154        qusql_parse::AlterSpecification::AddForeignKey { .. } => {}
1155        qusql_parse::AlterSpecification::Modify(ModifyColumn {
1156            if_exists,
1157            col,
1158            definition,
1159            ..
1160        }) => match e.get_column_mut(col.value) {
1161            Some(c) => {
1162                let new_col = parse_column(
1163                    definition,
1164                    c.identifier.clone(),
1165                    issues,
1166                    Some(options),
1167                    Some(types),
1168                );
1169                *c = new_col;
1170            }
1171            None if if_exists.is_none() => {
1172                issues
1173                    .err("No such column in table", &col)
1174                    .frag("Table defined here", &e.identifier_span);
1175            }
1176            None => {}
1177        },
1178        qusql_parse::AlterSpecification::AddColumn(AddColumn {
1179            identifier,
1180            data_type,
1181            if_not_exists_span,
1182            ..
1183        }) => {
1184            if e.get_column(identifier.value).is_some() {
1185                if if_not_exists_span.is_none() {
1186                    issues
1187                        .err("Column already exists in table", &identifier)
1188                        .frag("Table defined here", &e.identifier_span);
1189                }
1190            } else {
1191                e.columns.push(parse_column(
1192                    data_type,
1193                    identifier,
1194                    issues,
1195                    Some(options),
1196                    Some(types),
1197                ));
1198            }
1199        }
1200        qusql_parse::AlterSpecification::OwnerTo { .. } => {}
1201        qusql_parse::AlterSpecification::DropColumn(DropColumn {
1202            column, if_exists, ..
1203        }) => {
1204            let cnt = e.columns.len();
1205            e.columns.retain(|c| c.identifier != column);
1206            if cnt == e.columns.len() && if_exists.is_none() {
1207                issues
1208                    .err("No such column in table", &column)
1209                    .frag("Table defined here", &e.identifier_span);
1210            }
1211        }
1212        qusql_parse::AlterSpecification::AlterColumn(AlterColumn {
1213            column,
1214            alter_column_action,
1215            ..
1216        }) => match e.get_column_mut(column.value) {
1217            Some(c) => match alter_column_action {
1218                qusql_parse::AlterColumnAction::SetDefault { .. } => c.default = true,
1219                qusql_parse::AlterColumnAction::DropDefault { .. } => c.default = false,
1220                qusql_parse::AlterColumnAction::Type { type_, .. } => {
1221                    *c = parse_column(type_, column, issues, Some(options), Some(types));
1222                }
1223                qusql_parse::AlterColumnAction::SetNotNull { .. } => c.type_.not_null = true,
1224                qusql_parse::AlterColumnAction::DropNotNull { .. } => c.type_.not_null = false,
1225                a @ qusql_parse::AlterColumnAction::AddGenerated { .. } => {
1226                    issues.err("not implemented", &a);
1227                }
1228            },
1229            None => {
1230                issues
1231                    .err("No such column in table", &column)
1232                    .frag("Table defined here", &e.identifier_span);
1233            }
1234        },
1235        qusql_parse::AlterSpecification::DropIndex(drop_idx) => {
1236            let is_postgresql = options.parse_options.get_dialect().is_postgresql();
1237            let key = if is_postgresql {
1238                IndexKey {
1239                    table: None,
1240                    index: drop_idx.name.clone(),
1241                }
1242            } else {
1243                IndexKey {
1244                    table: Some(unqualified_name(issues, table_ref).clone()),
1245                    index: drop_idx.name.clone(),
1246                }
1247            };
1248            if indices.remove(&key).is_none() {
1249                issues.err("No such index to drop", &drop_idx.name);
1250            }
1251        }
1252        qusql_parse::AlterSpecification::RenameColumn(qusql_parse::RenameColumn {
1253            old_col_name,
1254            new_col_name,
1255            ..
1256        }) => match e.get_column_mut(old_col_name.value) {
1257            Some(c) => c.identifier = new_col_name,
1258            None => {
1259                issues
1260                    .err("No such column in table", &old_col_name)
1261                    .frag("Table defined here", &e.identifier_span);
1262            }
1263        },
1264        qusql_parse::AlterSpecification::RenameIndex(qusql_parse::RenameIndex {
1265            old_index_name,
1266            new_index_name,
1267            ..
1268        }) => {
1269            let is_postgresql = options.parse_options.get_dialect().is_postgresql();
1270            let table_id = unqualified_name(issues, table_ref).clone();
1271            let old_key = if is_postgresql {
1272                IndexKey {
1273                    table: None,
1274                    index: old_index_name.clone(),
1275                }
1276            } else {
1277                IndexKey {
1278                    table: Some(table_id.clone()),
1279                    index: old_index_name.clone(),
1280                }
1281            };
1282            match indices.remove(&old_key) {
1283                Some(span) => {
1284                    let new_key = if is_postgresql {
1285                        IndexKey {
1286                            table: None,
1287                            index: new_index_name,
1288                        }
1289                    } else {
1290                        IndexKey {
1291                            table: Some(table_id),
1292                            index: new_index_name,
1293                        }
1294                    };
1295                    indices.insert(new_key, span);
1296                }
1297                None => {
1298                    issues.err("No such index to rename", &old_index_name);
1299                }
1300            }
1301        }
1302        // We do not track constraints, so RENAME CONSTRAINT is a no-op.
1303        qusql_parse::AlterSpecification::RenameConstraint(_) => {}
1304        // RENAME TO inside ALTER TABLE requires renaming the table's map key, which is
1305        // not accessible here.  The standalone RENAME TABLE statement (handled by
1306        // process_rename_table) should be preferred; this variant is left as a no-op.
1307        qusql_parse::AlterSpecification::RenameTo(_) => {}
1308        qusql_parse::AlterSpecification::Change(qusql_parse::Change {
1309            column,
1310            new_column,
1311            definition,
1312            ..
1313        }) => match e.get_column_mut(column.value) {
1314            Some(c) => {
1315                *c = parse_column(definition, new_column, issues, Some(options), Some(types));
1316            }
1317            None => {
1318                issues
1319                    .err("No such column in table", &column)
1320                    .frag("Table defined here", &e.identifier_span);
1321            }
1322        },
1323        qusql_parse::AlterSpecification::Lock { .. }
1324        | qusql_parse::AlterSpecification::DropForeignKey { .. }
1325        | qusql_parse::AlterSpecification::DropPrimaryKey { .. }
1326        | qusql_parse::AlterSpecification::Algorithm { .. }
1327        | qusql_parse::AlterSpecification::AutoIncrement { .. }
1328        | qusql_parse::AlterSpecification::ReplicaIdentity(_)
1329        | qusql_parse::AlterSpecification::ValidateConstraint(_)
1330        | qusql_parse::AlterSpecification::AddTableConstraint(_)
1331        | qusql_parse::AlterSpecification::DisableTrigger(_)
1332        | qusql_parse::AlterSpecification::EnableTrigger(_)
1333        | qusql_parse::AlterSpecification::DisableRule(_)
1334        | qusql_parse::AlterSpecification::EnableRule(_)
1335        | qusql_parse::AlterSpecification::DisableRowLevelSecurity(_)
1336        | qusql_parse::AlterSpecification::EnableRowLevelSecurity(_)
1337        | qusql_parse::AlterSpecification::ForceRowLevelSecurity(_)
1338        | qusql_parse::AlterSpecification::NoForceRowLevelSecurity(_) => {}
1339    }
1340}
1341
1342impl<'a, 'b> SchemaCtx<'a, 'b> {
1343    fn process_drop_table(&mut self, t: qusql_parse::DropTable<'a>) {
1344        for i in t.tables {
1345            match self
1346                .schemas
1347                .schemas
1348                .entry(unqualified_name(self.issues, &i).clone())
1349            {
1350                alloc::collections::btree_map::Entry::Occupied(e) => {
1351                    if e.get().view {
1352                        self.issues
1353                            .err("Name defines a view not a table", &i)
1354                            .frag("View defined here", &e.get().identifier_span);
1355                    } else {
1356                        e.remove();
1357                    }
1358                }
1359                alloc::collections::btree_map::Entry::Vacant(_) => {
1360                    if t.if_exists.is_none() {
1361                        self.issues
1362                            .err("A table with this name does not exist to drop", &i);
1363                    }
1364                }
1365            }
1366        }
1367    }
1368
1369    fn process_drop_view(&mut self, v: qusql_parse::DropView<'a>) {
1370        for i in v.views {
1371            match self
1372                .schemas
1373                .schemas
1374                .entry(unqualified_name(self.issues, &i).clone())
1375            {
1376                alloc::collections::btree_map::Entry::Occupied(e) => {
1377                    if !e.get().view {
1378                        self.issues
1379                            .err("Name defines a table not a view", &i)
1380                            .frag("Table defined here", &e.get().identifier_span);
1381                    } else {
1382                        e.remove();
1383                    }
1384                }
1385                alloc::collections::btree_map::Entry::Vacant(_) => {
1386                    if v.if_exists.is_none() {
1387                        self.issues
1388                            .err("A view with this name does not exist to drop", &i);
1389                    }
1390                }
1391            }
1392        }
1393    }
1394
1395    fn process_drop_function(&mut self, f: qusql_parse::DropFunction<'a>) {
1396        for (func_name, _args) in &f.functions {
1397            match self
1398                .schemas
1399                .functions
1400                .entry(unqualified_name(self.issues, func_name).clone())
1401            {
1402                alloc::collections::btree_map::Entry::Occupied(e) => {
1403                    e.remove();
1404                }
1405                alloc::collections::btree_map::Entry::Vacant(_) => {
1406                    if f.if_exists.is_none() {
1407                        self.issues.err(
1408                            "A function with this name does not exist to drop",
1409                            func_name,
1410                        );
1411                    }
1412                }
1413            }
1414        }
1415    }
1416
1417    fn process_drop_procedure(&mut self, p: qusql_parse::DropProcedure<'a>) {
1418        let name = unqualified_name(self.issues, &p.procedure);
1419        match self.schemas.procedures.entry(name.clone()) {
1420            alloc::collections::btree_map::Entry::Occupied(e) => {
1421                e.remove();
1422            }
1423            alloc::collections::btree_map::Entry::Vacant(_) => {
1424                if p.if_exists.is_none() {
1425                    self.issues.err(
1426                        "A procedure with this name does not exist to drop",
1427                        &p.procedure,
1428                    );
1429                }
1430            }
1431        }
1432    }
1433
1434    fn process_drop_index(&mut self, ci: qusql_parse::DropIndex<'a>) {
1435        let key = IndexKey {
1436            table: ci.on.as_ref().map(|(_, t)| t.identifier.clone()),
1437            index: ci.index_name.clone(),
1438        };
1439        if self.schemas.indices.remove(&key).is_none() && ci.if_exists.is_none() {
1440            self.issues.err("No such index", &ci);
1441        }
1442    }
1443    /// DO $$ ... $$: re-parse the dollar-quoted body and recurse.
1444    fn process_do(&mut self, d: qusql_parse::Do<'a>) -> Result<(), ()> {
1445        match d.body {
1446            qusql_parse::DoBody::Statements(stmts) => self.process_statements(stmts)?,
1447            qusql_parse::DoBody::String(s, _) => {
1448                let span_offset = s.as_ptr() as usize - self.src.as_ptr() as usize;
1449                let body_opts = self
1450                    .options
1451                    .parse_options
1452                    .clone()
1453                    .function_body(true)
1454                    .span_offset(span_offset);
1455                let stmts = parse_statements(s, self.issues, &body_opts);
1456                self.process_statements(stmts)?;
1457            }
1458        }
1459        Ok(())
1460    }
1461
1462    /// IF ... THEN / ELSEIF / ELSE: recurse into all branches.
1463    fn process_if(&mut self, i: qusql_parse::If<'a>) -> Result<(), ()> {
1464        for cond in i.conditions {
1465            if self.eval_condition(&cond.search_condition)? {
1466                return self.process_statements(cond.then);
1467            }
1468        }
1469        if let Some((_, stmts)) = i.else_ {
1470            return self.process_statements(stmts);
1471        }
1472        Ok(())
1473    }
1474
1475    /// SELECT used at the top level of a schema file must be a bare list of
1476    /// function calls with no FROM / WHERE / LIMIT / etc.  Each expression is
1477    /// dispatched to `process_expression`.
1478    fn process_select(&mut self, s: qusql_parse::Select<'a>) -> Result<(), ()> {
1479        // Reject anything that looks like a real query.
1480        if let Some(span) = &s.from_span {
1481            self.issues
1482                .err("SELECT with FROM is not supported at schema level", span);
1483            return Err(());
1484        }
1485        if let Some((_, span)) = &s.where_ {
1486            self.issues
1487                .err("SELECT with WHERE is not supported at schema level", span);
1488            return Err(());
1489        }
1490        if let Some((span, _)) = &s.group_by {
1491            self.issues.err(
1492                "SELECT with GROUP BY is not supported at schema level",
1493                span,
1494            );
1495            return Err(());
1496        }
1497        if let Some((_, span)) = &s.having {
1498            self.issues
1499                .err("SELECT with HAVING is not supported at schema level", span);
1500            return Err(());
1501        }
1502        if let Some((span, _, _)) = &s.limit {
1503            self.issues
1504                .err("SELECT with LIMIT is not supported at schema level", span);
1505            return Err(());
1506        }
1507        if let Some((span, _)) = &s.order_by {
1508            self.issues.err(
1509                "SELECT with ORDER BY is not supported at schema level",
1510                span,
1511            );
1512            return Err(());
1513        }
1514        for se in s.select_exprs {
1515            self.process_expression(se.expr)?;
1516        }
1517        Ok(())
1518    }
1519
1520    fn process_expression(&mut self, expr: Expression<'a>) -> Result<(), ()> {
1521        self.eval_expr(&expr).map(|_| ())
1522    }
1523
1524    fn process_update(&mut self, u: qusql_parse::Update<'a>) -> Result<(), ()> {
1525        // We cannot evaluate SET expressions. Error out if any target table has
1526        // tracked rows whose state would be changed; if there are no tracked
1527        // rows the update has no effect on our model.
1528        let span = u.update_span;
1529        for tref in u.tables {
1530            if let qusql_parse::TableReference::Table { identifier, .. } = tref
1531                && identifier.prefix.is_empty()
1532                && self
1533                    .rows
1534                    .get(identifier.identifier.value)
1535                    .is_some_and(|r| !r.is_empty())
1536            {
1537                self.issues.err(
1538                    "UPDATE on a table with tracked rows is not supported in schema evaluator",
1539                    &span,
1540                );
1541                return Err(());
1542            }
1543        }
1544        Ok(())
1545    }
1546
1547    fn process_delete(&mut self, d: qusql_parse::Delete<'a>) -> Result<(), ()> {
1548        let qusql_parse::Delete {
1549            tables,
1550            using,
1551            where_,
1552            order_by,
1553            limit,
1554            ..
1555        } = d;
1556
1557        // USING / ORDER BY / LIMIT are not supported: error if any target table
1558        // has tracked rows that would be affected.
1559        let has_unsupported = !using.is_empty() || order_by.is_some() || limit.is_some();
1560        if has_unsupported {
1561            for table in &tables {
1562                if table.prefix.is_empty()
1563                    && self
1564                        .rows
1565                        .get(table.identifier.value)
1566                        .is_some_and(|r| !r.is_empty())
1567                {
1568                    self.issues.err(
1569                        "DELETE with USING/ORDER BY/LIMIT on a table with tracked rows \
1570                         is not supported in schema evaluator",
1571                        table,
1572                    );
1573                    return Err(());
1574                }
1575            }
1576        }
1577
1578        if let Some((where_expr, _)) = where_ {
1579            // Evaluate the WHERE for each tracked row; keep rows that do NOT match.
1580            for table in &tables {
1581                if table.prefix.is_empty() {
1582                    let name = table.identifier.value;
1583                    let Some(source_rows) = self.rows.get(name) else {
1584                        continue;
1585                    };
1586                    let source_rows = source_rows.clone();
1587                    let mut new_rows = Vec::new();
1588                    for row in source_rows {
1589                        let saved = self.current_row.replace(row.clone());
1590                        let matches = self.eval_expr(&where_expr).map(|v| v.is_truthy());
1591                        self.current_row = saved;
1592                        match matches {
1593                            Ok(true) => {} // row is deleted
1594                            Ok(false) => new_rows.push(row),
1595                            Err(()) => return Err(()),
1596                        }
1597                    }
1598                    self.rows.insert(name, new_rows);
1599                }
1600            }
1601        } else {
1602            // No WHERE - all rows in every target table are deleted.
1603            for table in tables {
1604                if table.prefix.is_empty() {
1605                    self.rows.remove(table.identifier.value);
1606                }
1607            }
1608        }
1609        Ok(())
1610    }
1611
1612    fn process_alter_type(&mut self, a: qusql_parse::AlterType<'a>) {
1613        let name = unqualified_name(self.issues, &a.name);
1614        match a.action {
1615            qusql_parse::AlterTypeAction::AddValue {
1616                if_not_exists_span,
1617                new_enum_value,
1618                ..
1619            } => {
1620                let Some(TypeDef::Enum { values, .. }) = self.schemas.types.get_mut(name) else {
1621                    self.issues.err("Type not found", &a.name);
1622                    return;
1623                };
1624                let new_val: Cow<'a, str> = new_enum_value.value;
1625                if values.contains(&new_val) {
1626                    if if_not_exists_span.is_none() {
1627                        self.issues.err("Enum value already exists", &a.name);
1628                    }
1629                    // IF NOT EXISTS: silently skip
1630                } else {
1631                    Arc::make_mut(values).push(new_val);
1632                }
1633            }
1634            qusql_parse::AlterTypeAction::RenameTo { new_name, .. } => {
1635                if let Some(typedef) = self.schemas.types.remove(name) {
1636                    self.schemas.types.insert(new_name, typedef);
1637                } else {
1638                    self.issues.err("Type not found", &a.name);
1639                }
1640            }
1641            qusql_parse::AlterTypeAction::RenameValue {
1642                existing_enum_value,
1643                new_enum_value,
1644                ..
1645            } => {
1646                let Some(TypeDef::Enum { values, .. }) = self.schemas.types.get_mut(name) else {
1647                    self.issues.err("Type not found", &a.name);
1648                    return;
1649                };
1650                let old_val: Cow<'a, str> = existing_enum_value.value;
1651                let new_val: Cow<'a, str> = new_enum_value.value;
1652                if let Some(entry) = Arc::make_mut(values).iter_mut().find(|v| **v == old_val) {
1653                    *entry = new_val;
1654                } else {
1655                    self.issues.err("Enum value not found", &a.name);
1656                }
1657            }
1658            // Other ALTER TYPE actions (OWNER TO, SET SCHEMA, etc.) have no effect on
1659            // the parts of the schema we track.
1660            _ => {}
1661        }
1662    }
1663
1664    fn process_truncate_table(&mut self, t: qusql_parse::TruncateTable<'a>) {
1665        for spec in t.tables {
1666            let name = unqualified_name(self.issues, &spec.table_name);
1667            self.rows.remove(name.value);
1668        }
1669    }
1670
1671    fn process_rename_table(&mut self, r: qusql_parse::RenameTable<'a>) {
1672        for pair in r.table_to_tables {
1673            let old_id = unqualified_name(self.issues, &pair.table);
1674            let new_id = unqualified_name(self.issues, &pair.new_table);
1675            // Rename in schemas map.
1676            if let Some(schema) = self.schemas.schemas.remove(old_id) {
1677                self.schemas.schemas.insert(new_id.clone(), schema);
1678            } else {
1679                self.issues.err("Table not found", &pair.table);
1680            }
1681            // Rename tracked rows if present.
1682            if let Some(rows) = self.rows.remove(old_id.value) {
1683                self.rows.insert(new_id.value, rows);
1684            }
1685        }
1686    }
1687
1688    fn process_insert(&mut self, i: qusql_parse::InsertReplace<'a>) -> Result<(), ()> {
1689        // Only unqualified table names are tracked.
1690        let table_name = match i.table.prefix.as_slice() {
1691            [] => i.table.identifier.value,
1692            _ => return Ok(()),
1693        };
1694        let col_names: Vec<&'a str> = i.columns.iter().map(|c| c.value).collect();
1695
1696        if let Some(set) = i.set {
1697            // INSERT ... SET col = expr, ...: evaluate as a single-row VALUES insert.
1698            let mut row: Vec<(&'a str, SqlValue<'a>)> = Vec::new();
1699            for pair in set.pairs {
1700                if let Ok(val) = self.eval_expr(&pair.value) {
1701                    row.push((pair.column.value, val));
1702                }
1703            }
1704            self.rows.entry(table_name).or_default().push(Rc::new(row));
1705            return Ok(());
1706        }
1707
1708        if let Some((_, value_rows)) = i.values {
1709            // INSERT ... VALUES (...)
1710            for row_exprs in value_rows {
1711                let mut row: Vec<(&'a str, SqlValue<'a>)> = Vec::new();
1712                for (col, expr) in col_names.iter().zip(row_exprs.iter()) {
1713                    if let Ok(val) = self.eval_expr(expr) {
1714                        row.push((col, val));
1715                    }
1716                }
1717                self.rows.entry(table_name).or_default().push(Rc::new(row));
1718            }
1719            return Ok(());
1720        }
1721
1722        let Some(select_stmt) = i.select else {
1723            return Ok(());
1724        };
1725        // Clone select expressions before eval borrows `self`.
1726        // Only available for a plain SELECT; compound queries (UNION etc.) produce no
1727        // named expressions to project, so we track rows without column values.
1728        let select_exprs: Vec<_> = if let qusql_parse::Statement::Select(s) = &select_stmt {
1729            s.select_exprs.iter().map(|se| se.expr.clone()).collect()
1730        } else {
1731            Vec::new()
1732        };
1733        let source_rows = self.eval_statement_rows(&select_stmt)?;
1734        for source_row in source_rows {
1735            let saved_row = self.current_row.replace(source_row);
1736            let mut row: Vec<(&'a str, SqlValue<'a>)> = Vec::new();
1737            for (col, expr) in col_names.iter().zip(select_exprs.iter()) {
1738                if let Ok(val) = self.eval_expr(expr) {
1739                    row.push((col, val));
1740                }
1741            }
1742            self.current_row = saved_row;
1743            self.rows.entry(table_name).or_default().push(Rc::new(row));
1744        }
1745        Ok(())
1746    }
1747
1748    /// Evaluate an expression to a `SqlValue`.
1749    /// Reads the current row from `self.current_row` (set by eval_select_matching_rows).
1750    /// Aggregate functions read rows from `self.current_table_rows` (set by eval_condition).
1751    /// Returns `Err(())` for expression types not yet handled by the evaluator.
1752    fn eval_expr(&mut self, expr: &Expression<'a>) -> Result<SqlValue<'a>, ()> {
1753        match expr {
1754            Expression::Null(_) => Ok(SqlValue::Null),
1755            Expression::Bool(b) => Ok(SqlValue::Bool(b.value)),
1756            Expression::Integer(i) => Ok(SqlValue::Integer(i.value as i64)),
1757            Expression::String(s) => Ok(match &s.value {
1758                Cow::Borrowed(b) => SqlValue::SourceText(b),
1759                Cow::Owned(o) => SqlValue::OwnedText(o.clone()),
1760            }),
1761            Expression::Identifier(id) => {
1762                if let [IdentifierPart::Name(name)] = id.parts.as_slice() {
1763                    self.bindings
1764                        .get(name.value)
1765                        .cloned()
1766                        .or_else(|| {
1767                            self.current_row.as_ref().and_then(|r| {
1768                                r.iter()
1769                                    .find(|(k, _)| *k == name.value)
1770                                    .map(|(_, v)| v.clone())
1771                            })
1772                        })
1773                        .ok_or(())
1774                } else {
1775                    Err(())
1776                }
1777            }
1778            Expression::Exists(e) => Ok(SqlValue::Bool(self.eval_exists(&e.subquery)?)),
1779            Expression::Unary(u) => match &u.op {
1780                qusql_parse::UnaryOperator::Not(_) | qusql_parse::UnaryOperator::LogicalNot(_) => {
1781                    Ok(SqlValue::Bool(!self.eval_expr(&u.operand)?.is_truthy()))
1782                }
1783                qusql_parse::UnaryOperator::Minus(_) => match self.eval_expr(&u.operand)? {
1784                    SqlValue::Integer(i) => Ok(SqlValue::Integer(-i)),
1785                    _ => Err(()),
1786                },
1787                _ => Err(()),
1788            },
1789            Expression::Function(f) => self.eval_function_expr(f),
1790            Expression::AggregateFunction(f) => self.eval_aggregate(f),
1791            Expression::Binary(b) => self.eval_binary_expr(b),
1792            _ => {
1793                self.issues
1794                    .err("Unimplemented expression in schema evaluator", expr);
1795                Err(())
1796            }
1797        }
1798    }
1799
1800    fn eval_exists(&mut self, stmt: &qusql_parse::Statement<'a>) -> Result<bool, ()> {
1801        Ok(!self.eval_statement_rows(stmt)?.is_empty())
1802    }
1803
1804    /// Resolve the rows for a SELECT's FROM clause.
1805    /// Returns empty if there is no FROM clause.
1806    /// Errors (with a message) for:
1807    ///   - joins or non-table FROM references (e.g. subqueries in FROM)
1808    ///   - qualified table names (e.g. `information_schema.columns`)
1809    ///   - unqualified table names not known to the schema evaluator
1810    fn resolve_from_rows(&mut self, s: &qusql_parse::Select<'a>) -> Result<Vec<Row<'a>>, ()> {
1811        use qusql_parse::TableReference;
1812        let Some(refs) = s.table_references.as_deref() else {
1813            return Ok(Vec::new());
1814        };
1815        let [TableReference::Table { identifier, .. }] = refs else {
1816            self.issues.err(
1817                "FROM clause with joins or subqueries is not supported in schema evaluator",
1818                &refs[0],
1819            );
1820            return Err(());
1821        };
1822        if !identifier.prefix.is_empty() {
1823            // Synthesize information_schema.columns from the current schema state.
1824            if identifier.prefix.len() == 1
1825                && identifier.prefix[0]
1826                    .0
1827                    .value
1828                    .eq_ignore_ascii_case("information_schema")
1829                && identifier.identifier.value.eq_ignore_ascii_case("columns")
1830            {
1831                let rows = self
1832                    .schemas
1833                    .schemas
1834                    .iter()
1835                    .flat_map(|(table_id, schema)| {
1836                        schema.columns.iter().map(move |col| {
1837                            Rc::new(alloc::vec![
1838                                ("table_name", SqlValue::SourceText(table_id.value)),
1839                                ("column_name", SqlValue::SourceText(col.identifier.value)),
1840                            ])
1841                        })
1842                    })
1843                    .collect();
1844                return Ok(rows);
1845            }
1846            self.issues.err(
1847                "Qualified table name in FROM clause is not supported in schema evaluator",
1848                identifier,
1849            );
1850            return Err(());
1851        }
1852        let name = identifier.identifier.value;
1853        let known =
1854            self.rows.contains_key(name) || self.schemas.schemas.keys().any(|k| k.value == name);
1855        if !known {
1856            self.issues.err(
1857                alloc::format!("Unknown table `{name}` referenced in schema evaluator"),
1858                &identifier.identifier,
1859            );
1860            return Err(());
1861        }
1862        Ok(self.rows.get(name).cloned().unwrap_or_default())
1863    }
1864
1865    /// Evaluate any SELECT-like statement (plain SELECT or UNION/INTERSECT/EXCEPT compound
1866    /// query) to a list of result rows.
1867    fn eval_statement_rows(
1868        &mut self,
1869        stmt: &qusql_parse::Statement<'a>,
1870    ) -> Result<Vec<Row<'a>>, ()> {
1871        match stmt {
1872            qusql_parse::Statement::Select(s) => self.eval_select_matching_rows(s),
1873            qusql_parse::Statement::CompoundQuery(cq) => self.eval_compound_query_rows(cq),
1874            _ => {
1875                self.issues
1876                    .err("Unsupported statement kind in INSERT ... SELECT", stmt);
1877                Err(())
1878            }
1879        }
1880    }
1881
1882    /// Evaluate a UNION / INTERSECT / EXCEPT compound query to rows.
1883    /// Only UNION (ALL or deduplicated) is supported; INTERSECT and EXCEPT error out.
1884    fn eval_compound_query_rows(
1885        &mut self,
1886        cq: &qusql_parse::CompoundQuery<'a>,
1887    ) -> Result<Vec<Row<'a>>, ()> {
1888        use qusql_parse::CompoundOperator;
1889        let mut result = self.eval_statement_rows(&cq.left)?;
1890        for branch in &cq.with {
1891            match branch.operator {
1892                CompoundOperator::Union => {
1893                    let branch_rows = self.eval_statement_rows(&branch.statement)?;
1894                    result.extend(branch_rows);
1895                }
1896                CompoundOperator::Intersect | CompoundOperator::Except => {
1897                    self.issues.err(
1898                        "INTERSECT / EXCEPT is not supported in schema evaluator",
1899                        &branch.operator_span,
1900                    );
1901                    return Err(());
1902                }
1903            }
1904        }
1905        Ok(result)
1906    }
1907
1908    /// Return the rows from the single table in a SELECT that satisfy the WHERE clause.
1909    fn eval_select_matching_rows(
1910        &mut self,
1911        s: &qusql_parse::Select<'a>,
1912    ) -> Result<Vec<Row<'a>>, ()> {
1913        let source_rows = self.resolve_from_rows(s)?;
1914        let where_expr: Option<Expression<'a>> = s.where_.as_ref().map(|(e, _)| e.clone());
1915        let mut result = Vec::new();
1916        for row in source_rows {
1917            let saved_row = self.current_row.replace(row.clone());
1918            let eval_result = match &where_expr {
1919                Some(expr) => self.eval_expr(expr).map(|v| v.is_truthy()),
1920                None => Ok(true),
1921            };
1922            self.current_row = saved_row;
1923            if eval_result? {
1924                result.push(row);
1925            }
1926        }
1927        Ok(result)
1928    }
1929
1930    fn eval_function_expr(
1931        &mut self,
1932        f: &qusql_parse::FunctionCallExpression<'a>,
1933    ) -> Result<SqlValue<'a>, ()> {
1934        use qusql_parse::Function;
1935        match &f.function {
1936            Function::Other(parts) if parts.len() == 1 => {
1937                let func_name = parts[0].value;
1938                let func_info = self
1939                    .schemas
1940                    .functions
1941                    .values()
1942                    .find(|func| func.name.value == func_name)
1943                    .and_then(|func| {
1944                        func.body
1945                            .as_ref()
1946                            .map(|b| (func.params.clone(), b.statements.clone()))
1947                    });
1948                let Some((params, statements)) = func_info else {
1949                    self.issues.err(
1950                        alloc::format!(
1951                            "Unknown function or function has no evaluable body: {func_name}"
1952                        ),
1953                        f,
1954                    );
1955                    return Err(());
1956                };
1957                let mut bindings = BTreeMap::new();
1958                for (param, arg) in params.iter().zip(f.args.iter()) {
1959                    let Some(name) = &param.name else { continue };
1960                    if let Ok(value) = self.eval_expr(arg) {
1961                        bindings.insert(name.value, value);
1962                    }
1963                }
1964                let old_bindings = core::mem::replace(&mut self.bindings, bindings);
1965                let old_return = self.return_value.take();
1966                let _ = self.process_statements(statements);
1967                let ret = self.return_value.take().unwrap_or(SqlValue::Null);
1968                self.return_value = old_return;
1969                self.bindings = old_bindings;
1970                Ok(ret)
1971            }
1972            Function::Coalesce => {
1973                // Clone args to avoid borrow conflict between &f and &mut self.
1974                let args: Vec<_> = f.args.clone();
1975                for arg in &args {
1976                    let v = self.eval_expr(arg)?;
1977                    if v != SqlValue::Null {
1978                        return Ok(v);
1979                    }
1980                }
1981                Ok(SqlValue::Null)
1982            }
1983            Function::Exists => {
1984                let Some(Expression::Subquery(sq)) = f.args.first() else {
1985                    self.issues.err("EXISTS without subquery argument", f);
1986                    return Err(());
1987                };
1988                let qusql_parse::Statement::Select(s) = &sq.expression else {
1989                    self.issues.err("EXISTS argument is not a SELECT", f);
1990                    return Err(());
1991                };
1992                let s = s.clone();
1993                Ok(SqlValue::Bool(
1994                    !self.eval_select_matching_rows(&s)?.is_empty(),
1995                ))
1996            }
1997            _ => {
1998                self.issues
1999                    .err("Unimplemented function in schema evaluator", f);
2000                Err(())
2001            }
2002        }
2003    }
2004
2005    fn eval_aggregate(
2006        &mut self,
2007        f: &qusql_parse::AggregateFunctionCallExpression<'a>,
2008    ) -> Result<SqlValue<'a>, ()> {
2009        use qusql_parse::Function;
2010        match &f.function {
2011            Function::Max => {
2012                let col_expr = f.args.first().ok_or(())?.clone();
2013                // Take rows out so we can call &mut self methods during iteration.
2014                let rows = core::mem::take(&mut self.current_table_rows);
2015                let mut max: Option<SqlValue<'a>> = None;
2016                for r in &rows {
2017                    // Set current_row so column references in the expression resolve correctly.
2018                    let saved = self.current_row.replace(r.clone());
2019                    // Skip rows where evaluation fails (NULL semantics for aggregates).
2020                    let v = self.eval_expr(&col_expr);
2021                    self.current_row = saved;
2022                    if let Ok(v) = v
2023                        && v != SqlValue::Null
2024                    {
2025                        max = Some(match max {
2026                            None => v,
2027                            Some(SqlValue::Integer(m)) => {
2028                                if let SqlValue::Integer(n) = &v {
2029                                    SqlValue::Integer(m.max(*n))
2030                                } else {
2031                                    v
2032                                }
2033                            }
2034                            Some(existing) => existing,
2035                        });
2036                    }
2037                }
2038                self.current_table_rows = rows;
2039                Ok(max.unwrap_or(SqlValue::Null))
2040            }
2041            Function::Count => {
2042                let rows = core::mem::take(&mut self.current_table_rows);
2043                let is_star = matches!(
2044                    f.args.first(),
2045                    Some(Expression::Identifier(ie))
2046                        if matches!(ie.parts.as_slice(), [IdentifierPart::Star(_)])
2047                );
2048                let count = if f.args.is_empty() || is_star {
2049                    // COUNT(*) or COUNT() - count all rows
2050                    rows.len() as i64
2051                } else {
2052                    let col_expr = f.args.first().unwrap().clone();
2053                    let mut n = 0i64;
2054                    for r in &rows {
2055                        let saved = self.current_row.replace(r.clone());
2056                        let v = self.eval_expr(&col_expr);
2057                        self.current_row = saved;
2058                        if matches!(v, Ok(v) if v != SqlValue::Null) {
2059                            n += 1;
2060                        }
2061                    }
2062                    n
2063                };
2064                self.current_table_rows = rows;
2065                Ok(SqlValue::Integer(count))
2066            }
2067            _ => {
2068                self.issues
2069                    .err("Unimplemented aggregate function in schema evaluator", f);
2070                Err(())
2071            }
2072        }
2073    }
2074
2075    fn eval_binary_expr(
2076        &mut self,
2077        b: &qusql_parse::BinaryExpression<'a>,
2078    ) -> Result<SqlValue<'a>, ()> {
2079        use qusql_parse::BinaryOperator;
2080        let lhs = self.eval_expr(&b.lhs)?;
2081        // Short-circuit AND/OR before evaluating rhs.
2082        match &b.op {
2083            BinaryOperator::And(_) => {
2084                if !lhs.is_truthy() {
2085                    return Ok(SqlValue::Bool(false));
2086                }
2087                return Ok(SqlValue::Bool(self.eval_expr(&b.rhs)?.is_truthy()));
2088            }
2089            BinaryOperator::Or(_) => {
2090                if lhs.is_truthy() {
2091                    return Ok(SqlValue::Bool(true));
2092                }
2093                return Ok(SqlValue::Bool(self.eval_expr(&b.rhs)?.is_truthy()));
2094            }
2095            _ => {}
2096        }
2097        let rhs = self.eval_expr(&b.rhs)?;
2098        // NULL comparisons propagate NULL (not an error).
2099        Ok(match &b.op {
2100            BinaryOperator::Eq(_) => lhs.sql_eq(&rhs).map_or(SqlValue::Null, SqlValue::Bool),
2101            BinaryOperator::Neq(_) => lhs
2102                .sql_eq(&rhs)
2103                .map_or(SqlValue::Null, |v| SqlValue::Bool(!v)),
2104            BinaryOperator::LtEq(_) => lhs.sql_lte(&rhs).map_or(SqlValue::Null, SqlValue::Bool),
2105            BinaryOperator::Lt(_) => rhs
2106                .sql_lte(&lhs)
2107                .map_or(SqlValue::Null, |v| SqlValue::Bool(!v)),
2108            BinaryOperator::GtEq(_) => rhs.sql_lte(&lhs).map_or(SqlValue::Null, SqlValue::Bool),
2109            BinaryOperator::Gt(_) => lhs
2110                .sql_lte(&rhs)
2111                .map_or(SqlValue::Null, |v| SqlValue::Bool(!v)),
2112            _ => {
2113                self.issues
2114                    .err("Unimplemented binary operator in schema evaluator", b);
2115                return Err(());
2116            }
2117        })
2118    }
2119
2120    /// Evaluate the search condition of an IF branch as a boolean.
2121    /// Sets `self.current_table_rows` from the FROM clause (if any) so that
2122    /// aggregate expressions in the condition can read the table rows.
2123    /// Returns `Err(())` if the condition uses constructs the evaluator does not handle.
2124    fn eval_condition(&mut self, s: &qusql_parse::Select<'a>) -> Result<bool, ()> {
2125        let expr = s.select_exprs.first().map(|se| se.expr.clone()).ok_or(())?;
2126
2127        // Load the FROM table's rows into current_table_rows for aggregate evaluation.
2128        let table_rows = self.resolve_from_rows(s)?;
2129        let saved = core::mem::replace(&mut self.current_table_rows, table_rows);
2130        let result = self.eval_expr(&expr);
2131        self.current_table_rows = saved;
2132
2133        Ok(result?.is_truthy())
2134    }
2135
2136    fn process_plpgsql_execute(&mut self, e: qusql_parse::PlpgsqlExecute<'a>) -> Result<(), ()> {
2137        let sql = self.resolve_expr_to_bound_string(&e.command);
2138        let Some(sql) = sql else {
2139            self.issues.err(
2140                "EXECUTE argument could not be resolved to a known SQL string",
2141                &e,
2142            );
2143            return Err(());
2144        };
2145        let span_offset = sql.as_ptr() as usize - self.src.as_ptr() as usize;
2146        let opts = self.options.parse_options.clone().span_offset(span_offset);
2147        let stmts = parse_statements(sql, self.issues, &opts);
2148        let _ = self.process_statements(stmts);
2149        Ok(())
2150    }
2151
2152    /// Try to resolve an expression to a `&'a str` from the current bindings.
2153    /// Only succeeds for bare identifier expressions that name a bound parameter
2154    /// whose value is a borrow from the original source text.
2155    fn resolve_expr_to_bound_string(&self, expr: &Expression<'a>) -> Option<&'a str> {
2156        let Expression::Identifier(ident) = expr else {
2157            return None;
2158        };
2159        let [IdentifierPart::Name(name)] = ident.parts.as_slice() else {
2160            return None;
2161        };
2162        self.bindings
2163            .get(name.value)
2164            .and_then(|v| v.as_source_text())
2165    }
2166}
2167
2168/// Parse a schema definition and return a terse description
2169///
2170/// Errors and warnings are added to issues. The schema is successfully
2171/// parsed if no errors are added to issues.
2172/// Built-in table/view definitions that are automatically available in
2173/// PostgreSQL and PostGIS databases.  These are injected into every
2174/// `Schemas` produced by [`parse_schemas`] when the dialect is
2175/// [`SQLDialect::PostgreSQL`] or [`SQLDialect::PostGIS`].
2176const POSTGRESQL_BUILTIN_SQL: &str = "
2177CREATE TABLE spatial_ref_sys (
2178    srid INTEGER NOT NULL,
2179    auth_name VARCHAR(256),
2180    auth_srid INTEGER,
2181    srtext VARCHAR(2048),
2182    proj4text VARCHAR(2048)
2183);
2184CREATE VIEW geometry_columns AS (
2185    SELECT '' AS f_table_catalog, '' AS f_table_schema, '' AS f_table_name,
2186           '' AS f_geometry_column, 0 AS coord_dimension, 0 AS srid, '' AS type
2187);
2188CREATE VIEW geography_columns AS (
2189    SELECT '' AS f_table_catalog, '' AS f_table_schema, '' AS f_table_name,
2190           '' AS f_geography_column, 0 AS coord_dimension, 0 AS srid, '' AS type
2191);
2192";
2193
2194pub fn parse_schemas<'a>(
2195    src: &'a str,
2196    issues: &mut Issues<'a>,
2197    options: &TypeOptions,
2198) -> Schemas<'a> {
2199    let statements = parse_statements(src, issues, &options.parse_options);
2200
2201    let mut schemas = Schemas {
2202        schemas: Default::default(),
2203        procedures: Default::default(),
2204        functions: Default::default(),
2205        indices: Default::default(),
2206        types: Default::default(),
2207    };
2208
2209    SchemaCtx::new(&mut schemas, issues, src, options).process_top_level_statements(statements);
2210
2211    let dummy_schemas = Schemas::default();
2212
2213    let mut typer = crate::typer::Typer {
2214        schemas: &dummy_schemas,
2215        issues,
2216        reference_types: Vec::new(),
2217        outer_reference_types: Vec::new(),
2218        arg_types: Default::default(),
2219        options,
2220        with_schemas: Default::default(),
2221    };
2222
2223    // Compute nullity of generated columns
2224    for (name, schema) in &mut schemas.schemas {
2225        if schema.columns.iter().all(|v| v.as_.is_none()) {
2226            continue;
2227        }
2228        typer.reference_types.clear();
2229        let mut columns = Vec::new();
2230        for c in &schema.columns {
2231            columns.push((c.identifier.clone(), c.type_.clone()));
2232        }
2233        typer.reference_types.push(crate::typer::ReferenceType {
2234            name: Some(name.clone()),
2235            span: schema.identifier_span.clone(),
2236            columns,
2237        });
2238        for c in &mut schema.columns {
2239            if let Some(as_) = &c.as_ {
2240                let full_type = crate::type_expression::type_expression(
2241                    &mut typer,
2242                    as_,
2243                    crate::type_expression::ExpressionFlags::default(),
2244                    BaseType::Any,
2245                );
2246                c.type_.not_null = full_type.not_null;
2247            }
2248        }
2249    }
2250
2251    // Inject dialect-specific built-in schemas so that system tables like
2252    // `spatial_ref_sys` are always resolvable without requiring the user to
2253    // declare them in their schema file.
2254    let dialect = options.parse_options.get_dialect();
2255    if dialect.is_postgresql() {
2256        // Coerce 'static to &'a str — sound because 'static: 'a.
2257        let builtin_src: &'a str = POSTGRESQL_BUILTIN_SQL;
2258        let builtin_options = TypeOptions::new().dialect(dialect);
2259        let builtin_stmts = parse_statements(
2260            builtin_src,
2261            &mut Issues::new(builtin_src),
2262            &builtin_options.parse_options,
2263        );
2264        let mut builtin_schemas: Schemas<'a> = Schemas::default();
2265        SchemaCtx::new(
2266            &mut builtin_schemas,
2267            &mut Issues::new(builtin_src),
2268            builtin_src,
2269            &builtin_options,
2270        )
2271        .process_top_level_statements(builtin_stmts);
2272        // User-defined tables take priority; only add entries not already present.
2273        for (k, v) in builtin_schemas.schemas {
2274            schemas.schemas.entry(k).or_insert(v);
2275        }
2276    }
2277
2278    schemas
2279}