Skip to main content

qusql_type/
schema.rs

1// Licensed under the Apache License, Version 2.0 (the "License");
2// you may not use this file except in compliance with the License.
3// You may obtain a copy of the License at
4//
5// http://www.apache.org/licenses/LICENSE-2.0
6//
7// Unless required by applicable law or agreed to in writing, software
8// distributed under the License is distributed on an "AS IS" BASIS,
9// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10// See the License for the specific language governing permissions and
11// limitations under the License.
12
13//! Parse and evaluate SQL schema definitions into a typed representation
14//! used for statement type-checking.
15//!
16//! Supports DDL statements (CREATE/ALTER/DROP for tables, views, functions,
17//! procedures, indices, and types) across MySQL/MariaDB, PostgreSQL/PostGIS,
18//! and SQLite dialects. Includes a limited schema-level evaluator that can
19//! interpret PL/pgSQL function bodies, DO blocks, IF/ELSE control flow,
20//! INSERT/DELETE/TRUNCATE for in-memory row tracking, and expressions
21//! (EXISTS, COALESCE, aggregates). PostgreSQL/PostGIS built-in schemas
22//! (e.g. `spatial_ref_sys`, `geometry_columns`) are injected automatically.
23//!
24//! ```
25//! use qusql_type::{schema::parse_schemas, TypeOptions, SQLDialect, Issues};
26//! let schemas = "
27//!     -- Table structure for table `events`
28//!     DROP TABLE IF EXISTS `events`;
29//!     CREATE TABLE `events` (
30//!       `id` bigint(20) NOT NULL,
31//!       `user` int(11) NOT NULL,
32//!       `event_key` int(11) NOT NULL,
33//!       `time` datetime NOT NULL
34//!     ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
35//!
36//!     -- Table structure for table `events_keys`
37//!     DROP TABLE IF EXISTS `event_keys`;
38//!     CREATE TABLE `event_keys` (
39//!       `id` int(11) NOT NULL,
40//!       `name` text NOT NULL
41//!     ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
42//!
43//!     -- Stand-in structure for view `events_view`
44//!     -- (See below for the actual view)
45//!     DROP VIEW IF EXISTS `events_view`;
46//!     CREATE TABLE `events_view` (
47//!         `id` int(11),
48//!         `user` int(11) NOT NULL,
49//!         `event_key` text NOT NULL,
50//!         `time` datetime NOT NULL
51//!     );
52//!
53//!     -- Indexes for table `events`
54//!     ALTER TABLE `events`
55//!       ADD PRIMARY KEY (`id`),
56//!       ADD KEY `time` (`time`),
57//!       ADD KEY `event_key` (`event_key`);
58//!
59//!     -- Indexes for table `event_keys`
60//!     ALTER TABLE `event_keys`
61//!       ADD PRIMARY KEY (`id`);
62//!
63//!     -- Constraints for table `events`
64//!     ALTER TABLE `events`
65//!       ADD CONSTRAINT `event_key` FOREIGN KEY (`event_key`) REFERENCES `event_keys` (`id`);
66//!
67//!     -- Structure for view `events_view`
68//!     DROP TABLE IF EXISTS `events_view`;
69//!     DROP VIEW IF EXISTS `events_view`;
70//!     CREATE ALGORITHM=UNDEFINED DEFINER=`phpmyadmin`@`localhost`
71//!         SQL SECURITY DEFINER VIEW `events_view` AS
72//!         SELECT
73//!             `events`.`id` AS `id`,
74//!             `events`.`user` AS `user`,
75//!             `event_keys`.`name` AS `event_key`,
76//!             `events`.`time` AS `time`
77//!         FROM `events`, `event_keys`
78//!         WHERE `events`.`event_key` = `event_keys`.`id`;
79//!     ";
80//!
81//! let mut issues = Issues::new(schemas);
82//! let schemas = parse_schemas(schemas,
83//!     &mut issues,
84//!     &TypeOptions::new().dialect(SQLDialect::MariaDB));
85//!
86//! assert!(issues.is_ok());
87//!
88//! for (name, schema) in schemas.schemas {
89//!     println!("{name}: {schema:?}")
90//! }
91//! ```
92
93use crate::{
94    Type, TypeOptions,
95    type_::{BaseType, FullType},
96    type_statement,
97    typer::unqualified_name,
98};
99use alloc::{borrow::Cow, boxed::Box, collections::BTreeMap, rc::Rc, sync::Arc, vec::Vec};
100use qusql_parse::{
101    AddColumn, AddIndex, AlterColumn, DataType, DataTypeProperty, DropColumn, Expression,
102    FunctionParam, Identifier, IdentifierPart, Issues, ModifyColumn, Span, Spanned, Statement,
103    parse_statements,
104};
105
106/// A column in a schema
107#[derive(Debug, Clone)]
108pub struct Column<'a> {
109    pub identifier: Identifier<'a>,
110    /// Type of the column
111    pub type_: FullType<'a>,
112    /// True if the column is auto_increment
113    pub auto_increment: bool,
114    pub default: bool,
115    pub as_: Option<Expression<'a>>,
116    pub generated: bool,
117}
118
119/// Schema representing a table or view
120#[derive(Debug)]
121pub struct Schema<'a> {
122    /// Span of identifier
123    pub identifier_span: Span,
124    /// List of columns
125    pub columns: Vec<Column<'a>>,
126    /// True if this is a view instead of a table
127    pub view: bool,
128}
129
130impl<'a> Schema<'a> {
131    pub fn get_column(&self, identifier: &str) -> Option<&Column<'a>> {
132        self.columns
133            .iter()
134            .find(|&column| column.identifier.value == identifier)
135    }
136    pub fn get_column_mut(&mut self, identifier: &str) -> Option<&mut Column<'a>> {
137        self.columns
138            .iter_mut()
139            .find(|column| column.identifier.value == identifier)
140    }
141}
142
143/// A stored procedure definition
144#[derive(Debug)]
145pub struct ProcedureDef<'a> {
146    pub name: Identifier<'a>,
147    pub params: Vec<FunctionParam<'a>>,
148    pub span: Span,
149}
150
151/// Parsed body of a stored function, with an offset for mapping spans
152/// back to the outer source file.
153#[derive(Debug)]
154pub struct FunctionDefBody<'a> {
155    /// Parsed statements from the function body
156    pub statements: Vec<Statement<'a>>,
157    /// The body source string (borrowed from the outer source)
158    pub src: &'a str,
159}
160
161/// A stored function definition
162#[derive(Debug)]
163pub struct FunctionDef<'a> {
164    pub name: Identifier<'a>,
165    pub params: Vec<FunctionParam<'a>>,
166    pub return_type: DataType<'a>,
167    pub span: Span,
168    /// Parsed body, present when the function was defined with a
169    /// dollar-quoted (non-escaped) AS body string.
170    pub body: Option<FunctionDefBody<'a>>,
171}
172
173#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
174pub struct IndexKey<'a> {
175    pub table: Option<Identifier<'a>>,
176    pub index: Identifier<'a>,
177}
178
179/// A user-defined type registered via `CREATE TYPE`
180#[derive(Debug)]
181pub enum TypeDef<'a> {
182    /// A PostgreSQL enum type
183    Enum {
184        values: Arc<Vec<Cow<'a, str>>>,
185        span: Span,
186    },
187}
188
189/// A description of tables, view, procedures and function in a schemas definition file
190#[derive(Debug, Default)]
191pub struct Schemas<'a> {
192    /// Map from name to Tables or views
193    pub schemas: BTreeMap<Identifier<'a>, Schema<'a>>,
194    /// Map from name to procedure
195    pub procedures: BTreeMap<Identifier<'a>, ProcedureDef<'a>>,
196    /// Map from name to function
197    pub functions: BTreeMap<Identifier<'a>, FunctionDef<'a>>,
198    /// Map from (table, index) to location
199    pub indices: BTreeMap<IndexKey<'a>, Span>,
200    /// Map from type name to type definition (e.g. enums created with `CREATE TYPE ... AS ENUM`)
201    pub types: BTreeMap<Identifier<'a>, TypeDef<'a>>,
202}
203
204/// Try to parse a borrowed string as SQL statements.
205/// Returns the parsed body if the string is a non-escaped borrow from `src`,
206/// or None if the string is escaped (Cow::Owned).
207fn try_parse_body<'a>(
208    src: &'a str,
209    body_str: &qusql_parse::SString<'a>,
210    issues: &mut Issues<'a>,
211    options: &qusql_parse::ParseOptions,
212) -> Option<FunctionDefBody<'a>> {
213    let Cow::Borrowed(borrowed) = &body_str.value else {
214        return None;
215    };
216    let span_offset = borrowed.as_ptr() as usize - src.as_ptr() as usize;
217    let body_options = options.clone().function_body(true).span_offset(span_offset);
218    let statements = parse_statements(borrowed, issues, &body_options);
219    Some(FunctionDefBody {
220        statements,
221        src: borrowed,
222    })
223}
224
225fn type_kind_from_parse<'a>(
226    type_: qusql_parse::Type<'a>,
227    unsigned: bool,
228    is_sqlite: bool,
229    types: Option<&BTreeMap<Identifier<'a>, TypeDef<'a>>>,
230    issues: &mut Issues<'a>,
231) -> Type<'a> {
232    match type_ {
233        qusql_parse::Type::TinyInt(v) => {
234            if !unsigned && matches!(v, Some((1, _))) {
235                BaseType::Bool.into()
236            } else if unsigned {
237                Type::U8
238            } else {
239                Type::I8
240            }
241        }
242        qusql_parse::Type::SmallInt(_) => {
243            if unsigned {
244                Type::U16
245            } else {
246                Type::I16
247            }
248        }
249        qusql_parse::Type::MediumInt(_) => {
250            if unsigned {
251                Type::U24
252            } else {
253                Type::I24
254            }
255        }
256        qusql_parse::Type::Int(_) => {
257            if unsigned {
258                Type::U32
259            } else {
260                Type::I32
261            }
262        }
263        qusql_parse::Type::BigInt(_) => {
264            if unsigned {
265                Type::U64
266            } else {
267                Type::I64
268            }
269        }
270        qusql_parse::Type::Char(_) => BaseType::String.into(),
271        qusql_parse::Type::VarChar(_) => BaseType::String.into(),
272        qusql_parse::Type::TinyText(_) => BaseType::String.into(),
273        qusql_parse::Type::MediumText(_) => BaseType::String.into(),
274        qusql_parse::Type::Text(_) => BaseType::String.into(),
275        qusql_parse::Type::LongText(_) => BaseType::String.into(),
276        qusql_parse::Type::Enum(e) => {
277            Type::Enum(Arc::new(e.into_iter().map(|s| s.value).collect()))
278        }
279        qusql_parse::Type::Set(s) => Type::Set(Arc::new(s.into_iter().map(|s| s.value).collect())),
280        qusql_parse::Type::Float(_) => Type::F32,
281        qusql_parse::Type::Double(_) => Type::F64,
282        qusql_parse::Type::DateTime(_) => BaseType::DateTime.into(),
283        qusql_parse::Type::Timestamp(_) => BaseType::TimeStamp.into(),
284        qusql_parse::Type::Time(_) => BaseType::Time.into(),
285        qusql_parse::Type::TinyBlob(_) => BaseType::Bytes.into(),
286        qusql_parse::Type::MediumBlob(_) => BaseType::Bytes.into(),
287        qusql_parse::Type::Date => BaseType::Date.into(),
288        qusql_parse::Type::Blob(_) => BaseType::Bytes.into(),
289        qusql_parse::Type::LongBlob(_) => BaseType::Bytes.into(),
290        qusql_parse::Type::VarBinary(_) => BaseType::Bytes.into(),
291        qusql_parse::Type::Binary(_) => BaseType::Bytes.into(),
292        qusql_parse::Type::Boolean => BaseType::Bool.into(),
293        qusql_parse::Type::Integer(_) => {
294            if is_sqlite {
295                BaseType::Integer.into()
296            } else {
297                Type::I32
298            }
299        }
300        qusql_parse::Type::Float8 => BaseType::Float.into(),
301        qusql_parse::Type::Numeric(ref v) => {
302            let span = v.as_ref().map(|(_, _, s)| s.clone()).unwrap_or(0..0);
303            issues.err("NUMERIC type is not yet supported", &span);
304            BaseType::Float.into()
305        }
306        qusql_parse::Type::Decimal(ref v) => {
307            let span = v.as_ref().map(|(_, _, s)| s.clone()).unwrap_or(0..0);
308            issues.err("DECIMAL type is not yet supported", &span);
309            BaseType::Float.into()
310        }
311        qusql_parse::Type::Timestamptz => BaseType::TimeStamp.into(),
312        qusql_parse::Type::Json => BaseType::String.into(),
313        qusql_parse::Type::Jsonb => BaseType::String.into(),
314        qusql_parse::Type::Bit(_, _) => BaseType::Bytes.into(),
315        qusql_parse::Type::VarBit(_) => BaseType::Bytes.into(),
316        qusql_parse::Type::Bytea => BaseType::Bytes.into(),
317        qusql_parse::Type::Named(qname) => {
318            // Look up user-defined types (e.g. enums created with CREATE TYPE ... AS ENUM).
319            // Only unqualified names are looked up; schema-qualified names (e.g. public.mytype)
320            // are not stored with a prefix in the types map.
321            if let Some(types) = types
322                && qname.prefix.is_empty()
323                && let Some(TypeDef::Enum { values, .. }) = types.get(qname.identifier.value)
324            {
325                Type::Enum(values.clone())
326            } else {
327                BaseType::String.into()
328            }
329        }
330        qusql_parse::Type::Inet4 => BaseType::String.into(),
331        qusql_parse::Type::Inet6 => BaseType::String.into(),
332        qusql_parse::Type::InetAddr => BaseType::String.into(),
333        qusql_parse::Type::Cidr => BaseType::String.into(),
334        qusql_parse::Type::Macaddr => BaseType::String.into(),
335        qusql_parse::Type::Macaddr8 => BaseType::String.into(),
336        qusql_parse::Type::Array(inner, _) => Type::Array(Box::new(type_kind_from_parse(
337            *inner, false, is_sqlite, types, issues,
338        ))),
339        qusql_parse::Type::Table(ref span, _) => {
340            issues.err("TABLE type is not yet supported", span);
341            BaseType::String.into()
342        }
343        qusql_parse::Type::Serial => Type::I32,
344        qusql_parse::Type::SmallSerial => Type::I16,
345        qusql_parse::Type::BigSerial => Type::I64,
346        qusql_parse::Type::Money => BaseType::Float.into(),
347        qusql_parse::Type::Timetz(_) => BaseType::Time.into(),
348        qusql_parse::Type::Interval(_) => BaseType::TimeInterval.into(),
349        qusql_parse::Type::TsQuery => BaseType::String.into(),
350        qusql_parse::Type::TsVector => BaseType::String.into(),
351        qusql_parse::Type::Uuid => BaseType::Uuid.into(),
352        qusql_parse::Type::Xml => BaseType::String.into(),
353        qusql_parse::Type::Range(sub) | qusql_parse::Type::MultiRange(sub) => {
354            use qusql_parse::RangeSubtype;
355            let elem = match sub {
356                RangeSubtype::Int4 => BaseType::Integer,
357                RangeSubtype::Int8 => BaseType::Integer,
358                RangeSubtype::Num => BaseType::Float,
359                RangeSubtype::Ts => BaseType::DateTime,
360                RangeSubtype::Tstz => BaseType::TimeStamp,
361                RangeSubtype::Date => BaseType::Date,
362            };
363            Type::Range(elem)
364        }
365        qusql_parse::Type::Point
366        | qusql_parse::Type::Line
367        | qusql_parse::Type::Lseg
368        | qusql_parse::Type::Box
369        | qusql_parse::Type::Path
370        | qusql_parse::Type::Polygon
371        | qusql_parse::Type::Circle => Type::Geometry,
372    }
373}
374
375pub(crate) fn parse_column<'a>(
376    data_type: DataType<'a>,
377    identifier: Identifier<'a>,
378    _issues: &mut Issues<'a>,
379    options: Option<&TypeOptions>,
380    types: Option<&BTreeMap<Identifier<'a>, TypeDef<'a>>>,
381) -> Column<'a> {
382    let mut not_null = false;
383    let mut unsigned = false;
384    let mut auto_increment = false;
385    let mut default = false;
386    let mut as_ = None;
387    let mut generated = false;
388    let mut primary_key = false;
389    let is_sqlite = options
390        .map(|v| v.parse_options.get_dialect().is_sqlite())
391        .unwrap_or_default();
392    for p in data_type.properties {
393        match p {
394            DataTypeProperty::Signed(_) => unsigned = false,
395            DataTypeProperty::Unsigned(_) => unsigned = true,
396            DataTypeProperty::Null(_) => not_null = false,
397            DataTypeProperty::NotNull(_) => not_null = true,
398            DataTypeProperty::AutoIncrement(_) => auto_increment = true,
399            DataTypeProperty::As((_, e)) => as_ = Some(e),
400            DataTypeProperty::Default(_) => default = true,
401            DataTypeProperty::GeneratedAlways(_) => generated = true,
402            DataTypeProperty::GeneratedAlwaysAsExpr { .. } => generated = true,
403            DataTypeProperty::PrimaryKey(_) => primary_key = true,
404            _ => {}
405        }
406    }
407    // SQLite INTEGER PRIMARY KEY is an alias for rowid (auto-increment)
408    if is_sqlite && primary_key && matches!(data_type.type_, qusql_parse::Type::Integer(_)) {
409        auto_increment = true;
410    }
411    // PRIMARY KEY implies NOT NULL
412    if primary_key {
413        not_null = true;
414    }
415    let type_ = type_kind_from_parse(data_type.type_, unsigned, is_sqlite, types, _issues);
416    Column {
417        identifier,
418        type_: FullType {
419            t: type_,
420            not_null,
421            list_hack: false,
422        },
423        auto_increment,
424        as_,
425        default,
426        generated,
427    }
428}
429
430/// A runtime SQL value produced during schema evaluation.
431#[derive(Clone, Debug, PartialEq)]
432enum SqlValue<'a> {
433    Null,
434    Bool(bool),
435    Integer(i64),
436    /// A text slice directly from the SQL source - span arithmetic still works.
437    SourceText(&'a str),
438    /// A computed / owned text value.
439    OwnedText(alloc::string::String),
440}
441
442impl<'a> SqlValue<'a> {
443    fn as_source_text(&self) -> Option<&'a str> {
444        if let SqlValue::SourceText(s) = self {
445            Some(s)
446        } else {
447            None
448        }
449    }
450
451    fn is_truthy(&self) -> bool {
452        match self {
453            SqlValue::Bool(b) => *b,
454            SqlValue::Integer(i) => *i != 0,
455            SqlValue::Null => false,
456            SqlValue::SourceText(_) | SqlValue::OwnedText(_) => true,
457        }
458    }
459
460    fn sql_eq(&self, other: &SqlValue<'a>) -> Option<bool> {
461        match (self, other) {
462            (SqlValue::Null, _) | (_, SqlValue::Null) => None,
463            (SqlValue::Bool(a), SqlValue::Bool(b)) => Some(a == b),
464            (SqlValue::Integer(a), SqlValue::Integer(b)) => Some(a == b),
465            (SqlValue::SourceText(a), SqlValue::SourceText(b)) => Some(a == b),
466            (SqlValue::SourceText(a), SqlValue::OwnedText(b)) => Some(*a == b.as_str()),
467            (SqlValue::OwnedText(a), SqlValue::SourceText(b)) => Some(a.as_str() == *b),
468            (SqlValue::OwnedText(a), SqlValue::OwnedText(b)) => Some(a == b),
469            _ => None,
470        }
471    }
472
473    fn sql_lte(&self, other: &SqlValue<'a>) -> Option<bool> {
474        match (self, other) {
475            (SqlValue::Null, _) | (_, SqlValue::Null) => None,
476            (SqlValue::Integer(a), SqlValue::Integer(b)) => Some(a <= b),
477            _ => None,
478        }
479    }
480}
481
482/// A single row of evaluated column values.
483type Row<'a> = Rc<Vec<(&'a str, SqlValue<'a>)>>;
484
485/// Processing context for schema evaluation: holds mutable schema state, issue
486/// sink, the source text for span-offset calculations, and parse/type options.
487struct SchemaCtx<'a, 'b> {
488    schemas: &'b mut Schemas<'a>,
489    issues: &'b mut Issues<'a>,
490    /// The source text slice that all spans inside `issues` refer to.
491    src: &'a str,
492    options: &'b TypeOptions,
493    /// Active function argument bindings: parameter name -> SQL value.
494    /// Set when evaluating a known function body.
495    bindings: BTreeMap<&'a str, SqlValue<'a>>,
496    /// In-memory row store for tables populated during schema evaluation.
497    rows: BTreeMap<&'a str, Vec<Row<'a>>>,
498    /// Table rows made available to aggregate functions during eval_condition.
499    /// Temporarily swapped via core::mem::take so eval functions can take &mut self.
500    current_table_rows: Vec<Row<'a>>,
501    /// The row currently being evaluated (e.g. during WHERE clause filtering).
502    /// Set by eval_select_matching_rows around each row's eval call.
503    current_row: Option<Row<'a>>,
504    /// The return value of the most recently executed RETURN statement.
505    /// Set by the Return arm in process_statement; consumed by eval_function_expr.
506    return_value: Option<SqlValue<'a>>,
507}
508
509impl<'a, 'b> SchemaCtx<'a, 'b> {
510    fn new(
511        schemas: &'b mut Schemas<'a>,
512        issues: &'b mut Issues<'a>,
513        src: &'a str,
514        options: &'b TypeOptions,
515    ) -> Self {
516        Self {
517            schemas,
518            issues,
519            src,
520            options,
521            bindings: Default::default(),
522            rows: Default::default(),
523            current_table_rows: Default::default(),
524            current_row: Default::default(),
525            return_value: None,
526        }
527    }
528
529    /// Process a list of top-level schema statements.  Each statement is
530    /// independent: errors in one do not stop processing of the next.
531    fn process_top_level_statements(&mut self, statements: Vec<qusql_parse::Statement<'a>>) {
532        for statement in statements {
533            let _ = self.process_statement(statement);
534        }
535    }
536
537    /// Process a list of statements in a block or function body, stopping at
538    /// the first `Err` (error or `RETURN`).
539    fn process_statements(
540        &mut self,
541        statements: Vec<qusql_parse::Statement<'a>>,
542    ) -> Result<(), ()> {
543        for statement in statements {
544            self.process_statement(statement)?;
545        }
546        Ok(())
547    }
548
549    fn process_statement(&mut self, statement: qusql_parse::Statement<'a>) -> Result<(), ()> {
550        match statement {
551            qusql_parse::Statement::CreateTable(t) => {
552                self.process_create_table(*t);
553                Ok(())
554            }
555            qusql_parse::Statement::CreateView(v) => {
556                self.process_create_view(*v);
557                Ok(())
558            }
559            qusql_parse::Statement::CreateFunction(f) => {
560                self.process_create_function(*f);
561                Ok(())
562            }
563            qusql_parse::Statement::CreateProcedure(p) => {
564                self.process_create_procedure(*p);
565                Ok(())
566            }
567            qusql_parse::Statement::CreateIndex(ci) => {
568                self.process_create_index(*ci);
569                Ok(())
570            }
571            qusql_parse::Statement::CreateTrigger(_) => Ok(()),
572            qusql_parse::Statement::CreateTypeEnum(s) => {
573                self.process_create_type_enum(*s);
574                Ok(())
575            }
576            qusql_parse::Statement::AlterTable(a) => {
577                self.process_alter_table(*a);
578                Ok(())
579            }
580            qusql_parse::Statement::DropTable(t) => {
581                self.process_drop_table(*t);
582                Ok(())
583            }
584            qusql_parse::Statement::DropView(v) => {
585                self.process_drop_view(*v);
586                Ok(())
587            }
588            qusql_parse::Statement::DropFunction(f) => {
589                self.process_drop_function(*f);
590                Ok(())
591            }
592            qusql_parse::Statement::DropProcedure(p) => {
593                self.process_drop_procedure(*p);
594                Ok(())
595            }
596            qusql_parse::Statement::DropIndex(ci) => {
597                self.process_drop_index(*ci);
598                Ok(())
599            }
600            qusql_parse::Statement::DropDatabase(s) => {
601                self.issues.err("not implemented", &s);
602                Err(())
603            }
604            qusql_parse::Statement::DropServer(s) => {
605                self.issues.err("not implemented", &s);
606                Err(())
607            }
608            qusql_parse::Statement::DropTrigger(_) => Ok(()),
609            qusql_parse::Statement::DropType(s) => {
610                self.process_drop_type(*s);
611                Ok(())
612            }
613            // Control-flow: recurse into all reachable branches.
614            qusql_parse::Statement::Do(d) => self.process_do(*d),
615            qusql_parse::Statement::Block(b) => self.process_statements(b.statements),
616            qusql_parse::Statement::If(i) => self.process_if(*i),
617            // SELECT: may call a known function whose body we can evaluate.
618            qusql_parse::Statement::Select(s) => self.process_select(*s),
619            // DML: track row insertions so conditions like EXISTS(...) can be evaluated.
620            qusql_parse::Statement::InsertReplace(i) => self.process_insert(*i),
621            // Transaction control: we assume all transactions commit.
622            qusql_parse::Statement::Commit(_) => Ok(()),
623            qusql_parse::Statement::Begin(_) => Ok(()),
624            // Statements with no schema effect.
625            qusql_parse::Statement::Grant(_) => Ok(()),
626            qusql_parse::Statement::CommentOn(_) => Ok(()),
627            qusql_parse::Statement::Analyze(_) => Ok(()),
628            // Variable / cursor plumbing — no schema effect.
629            qusql_parse::Statement::Set(_) => Ok(()),
630            // Assign and Perform may call known functions with schema effects.
631            qusql_parse::Statement::Assign(a) => {
632                for se in a.value.select_exprs {
633                    self.process_expression(se.expr)?;
634                }
635                Ok(())
636            }
637            qusql_parse::Statement::Perform(p) => self.process_expression(p.expr),
638            qusql_parse::Statement::DeclareVariable(d) => {
639                // Evaluate the DEFAULT expression for its side effects (may call user-defined functions).
640                if let Some((_, select)) = d.default {
641                    self.eval_condition(&select)?;
642                }
643                Ok(())
644            }
645            // DeclareHandler bodies only run on error; we model the happy path, so skip them.
646            qusql_parse::Statement::DeclareHandler(_) => Ok(()),
647            qusql_parse::Statement::ExecuteFunction(s) => {
648                self.issues.err("not implemented", &s);
649                Err(())
650            }
651            // RAISE EXCEPTION aborts execution; anything else is a log/notice with no schema effect.
652            qusql_parse::Statement::Raise(r) => {
653                if matches!(r.level, Some(qusql_parse::RaiseLevel::Exception(_))) {
654                    Err(())
655                } else {
656                    Ok(())
657                }
658            }
659            qusql_parse::Statement::Return(r) => {
660                self.return_value = self.eval_expr(&r.expr).ok();
661                Err(())
662            }
663            qusql_parse::Statement::PlpgsqlExecute(e) => self.process_plpgsql_execute(*e),
664            qusql_parse::Statement::Update(u) => self.process_update(*u),
665            qusql_parse::Statement::Delete(d) => self.process_delete(*d),
666            qusql_parse::Statement::AlterType(a) => {
667                self.process_alter_type(*a);
668                Ok(())
669            }
670            qusql_parse::Statement::TruncateTable(t) => {
671                self.process_truncate_table(*t);
672                Ok(())
673            }
674            qusql_parse::Statement::RenameTable(r) => {
675                self.process_rename_table(*r);
676                Ok(())
677            }
678            s => {
679                self.issues.err(
680                    alloc::format!("Unsupported statement {s:?} in schema definition"),
681                    &s,
682                );
683                Err(())
684            }
685        }
686    }
687
688    fn process_create_table(&mut self, t: qusql_parse::CreateTable<'a>) {
689        let mut replace = false;
690        let id = unqualified_name(self.issues, &t.identifier);
691        let mut schema = Schema {
692            view: false,
693            identifier_span: id.span.clone(),
694            columns: Default::default(),
695        };
696        for o in t.create_options {
697            match o {
698                qusql_parse::CreateOption::OrReplace(_) => replace = true,
699                qusql_parse::CreateOption::Temporary { temporary_span, .. } => {
700                    self.issues.err("Not supported", &temporary_span);
701                }
702                qusql_parse::CreateOption::Materialized(s) => {
703                    self.issues.err("Not supported", &s);
704                }
705                qusql_parse::CreateOption::Concurrently(s) => {
706                    self.issues.err("Not supported", &s);
707                }
708                qusql_parse::CreateOption::Unique(s) => {
709                    self.issues.err("Not supported", &s);
710                }
711                _ => {}
712            }
713        }
714        for d in t.create_definitions {
715            match d {
716                qusql_parse::CreateDefinition::ColumnDefinition {
717                    identifier,
718                    data_type,
719                } => {
720                    let column = parse_column(
721                        data_type,
722                        identifier.clone(),
723                        self.issues,
724                        Some(self.options),
725                        Some(&self.schemas.types),
726                    );
727                    if let Some(oc) = schema.get_column(column.identifier.value) {
728                        self.issues
729                            .err("Column already defined", &identifier)
730                            .frag("Defined here", &oc.identifier);
731                    } else {
732                        schema.columns.push(column);
733                    }
734                }
735                qusql_parse::CreateDefinition::IndexDefinition {
736                    index_type,
737                    index_name,
738                    cols,
739                    ..
740                } => {
741                    // Validate that every column referenced by the index exists.
742                    for col in &cols {
743                        if let qusql_parse::IndexColExpr::Column(cname) = &col.expr
744                            && schema.get_column(cname.value).is_none()
745                        {
746                            self.issues
747                                .err("No such column in table", col)
748                                .frag("Table defined here", &schema.identifier_span);
749                        }
750                    }
751                    // PRIMARY KEY implies NOT NULL on each listed column.
752                    if matches!(index_type, qusql_parse::IndexType::Primary(_)) {
753                        for col in &cols {
754                            if let qusql_parse::IndexColExpr::Column(cname) = &col.expr
755                                && let Some(c) = schema.get_column_mut(cname.value)
756                            {
757                                c.type_.not_null = true;
758                            }
759                        }
760                    }
761                    // Register named indices.
762                    if let Some(name) = index_name {
763                        let ident = if self.options.parse_options.get_dialect().is_postgresql() {
764                            IndexKey {
765                                table: None,
766                                index: name.clone(),
767                            }
768                        } else {
769                            IndexKey {
770                                table: Some(id.clone()),
771                                index: name.clone(),
772                            }
773                        };
774                        let span = name.span();
775                        if let Some(old) = self.schemas.indices.insert(ident, span) {
776                            self.issues
777                                .err("Multiple indices with the same identifier", &name)
778                                .frag("Already defined here", &old);
779                        }
780                    }
781                }
782                qusql_parse::CreateDefinition::ForeignKeyDefinition { .. } => {}
783                qusql_parse::CreateDefinition::CheckConstraintDefinition { .. } => {}
784                qusql_parse::CreateDefinition::LikeTable { source_table, .. } => {
785                    let source_id = unqualified_name(self.issues, &source_table);
786                    if let Some(src) = self.schemas.schemas.get(source_id) {
787                        let cols: Vec<Column<'a>> = src.columns.to_vec();
788                        for col in cols {
789                            if schema.get_column(col.identifier.value).is_none() {
790                                schema.columns.push(col);
791                            }
792                        }
793                    } else {
794                        self.issues.err("Table not found", &source_table);
795                    }
796                }
797            }
798        }
799        match self.schemas.schemas.entry(id.clone()) {
800            alloc::collections::btree_map::Entry::Occupied(mut e) => {
801                if replace {
802                    e.insert(schema);
803                } else if t.if_not_exists.is_none() {
804                    self.issues
805                        .err("Table already defined", &t.identifier)
806                        .frag("Defined here", &e.get().identifier_span);
807                }
808            }
809            alloc::collections::btree_map::Entry::Vacant(e) => {
810                e.insert(schema);
811            }
812        }
813    }
814
815    fn process_create_view(&mut self, v: qusql_parse::CreateView<'a>) {
816        let mut replace = false;
817        let mut schema = Schema {
818            view: true,
819            identifier_span: v.name.span(),
820            columns: Default::default(),
821        };
822        for o in v.create_options {
823            match o {
824                qusql_parse::CreateOption::OrReplace(_) => replace = true,
825                qusql_parse::CreateOption::Temporary { temporary_span, .. } => {
826                    self.issues.err("Not supported", &temporary_span);
827                }
828                qusql_parse::CreateOption::Materialized(s) => {
829                    self.issues.err("Not supported", &s);
830                }
831                qusql_parse::CreateOption::Concurrently(s) => {
832                    self.issues.err("Not supported", &s);
833                }
834                qusql_parse::CreateOption::Unique(s) => {
835                    self.issues.err("Not supported", &s);
836                }
837                _ => {}
838            }
839        }
840        {
841            let mut typer: crate::typer::Typer<'a, '_> = crate::typer::Typer {
842                schemas: self.schemas,
843                issues: self.issues,
844                reference_types: Vec::new(),
845                outer_reference_types: Vec::new(),
846                arg_types: Default::default(),
847                options: self.options,
848                with_schemas: Default::default(),
849            };
850            let t = type_statement::type_statement(&mut typer, &v.select);
851            let s = if let type_statement::InnerStatementType::Select(s) = t {
852                s
853            } else {
854                self.issues.err("Not supported", &v.select.span());
855                return;
856            };
857            for column in s.columns {
858                let Some(name) = column.name else {
859                    self.issues.err(
860                        "View column has no name; add an alias with AS",
861                        &v.select.span(),
862                    );
863                    continue;
864                };
865                schema.columns.push(Column {
866                    identifier: name,
867                    type_: column.type_,
868                    auto_increment: false,
869                    default: false,
870                    as_: None,
871                    generated: false,
872                });
873            }
874        }
875        match self
876            .schemas
877            .schemas
878            .entry(unqualified_name(self.issues, &v.name).clone())
879        {
880            alloc::collections::btree_map::Entry::Occupied(mut e) => {
881                if replace {
882                    e.insert(schema);
883                } else if v.if_not_exists.is_none() {
884                    self.issues
885                        .err("View already defined", &v.name)
886                        .frag("Defined here", &e.get().identifier_span);
887                }
888            }
889            alloc::collections::btree_map::Entry::Vacant(e) => {
890                e.insert(schema);
891            }
892        }
893    }
894
895    fn process_create_function(&mut self, f: qusql_parse::CreateFunction<'a>) {
896        let mut replace = false;
897        for o in &f.create_options {
898            if matches!(o, qusql_parse::CreateOption::OrReplace(_)) {
899                replace = true;
900            }
901        }
902        let body = f
903            .body
904            .as_ref()
905            .and_then(|b| b.strings.first())
906            .and_then(|s| try_parse_body(self.src, s, self.issues, &self.options.parse_options));
907        let name = f.name.clone();
908        let def = FunctionDef {
909            name: f.name.clone(),
910            params: f.params,
911            return_type: f.return_type,
912            span: f.create_span.join_span(&f.function_span),
913            body,
914        };
915        match self.schemas.functions.entry(name) {
916            alloc::collections::btree_map::Entry::Occupied(mut e) => {
917                if replace {
918                    e.insert(def);
919                } else if f.if_not_exists.is_none() {
920                    self.issues
921                        .err("Function already defined", &f.name)
922                        .frag("Defined here", &e.get().span);
923                }
924            }
925            alloc::collections::btree_map::Entry::Vacant(e) => {
926                e.insert(def);
927            }
928        }
929    }
930
931    fn process_create_procedure(&mut self, p: qusql_parse::CreateProcedure<'a>) {
932        let mut replace = false;
933        for o in &p.create_options {
934            if matches!(o, qusql_parse::CreateOption::OrReplace(_)) {
935                replace = true;
936            }
937        }
938        let name = p.name.clone();
939        let def = ProcedureDef {
940            name: p.name.clone(),
941            params: p.params,
942            span: p.create_span.join_span(&p.procedure_span),
943        };
944        match self.schemas.procedures.entry(name) {
945            alloc::collections::btree_map::Entry::Occupied(mut e) => {
946                if replace {
947                    e.insert(def);
948                } else if p.if_not_exists.is_none() {
949                    self.issues
950                        .err("Procedure already defined", &p.name)
951                        .frag("Defined here", &e.get().span);
952                }
953            }
954            alloc::collections::btree_map::Entry::Vacant(e) => {
955                e.insert(def);
956            }
957        }
958    }
959
960    fn process_create_index(&mut self, ci: qusql_parse::CreateIndex<'a>) {
961        let t = unqualified_name(self.issues, &ci.table_name);
962        if let Some(table) = self.schemas.schemas.get(t) {
963            for col in &ci.column_names {
964                if let qusql_parse::IndexColExpr::Column(name) = &col.expr
965                    && table.get_column(name.value).is_none()
966                {
967                    self.issues
968                        .err("No such column in table", col)
969                        .frag("Table defined here", &table.identifier_span);
970                }
971            }
972        } else {
973            self.issues.err("No such table", &ci.table_name);
974        }
975        let index_name = match &ci.index_name {
976            Some(name) => name.clone(),
977            None => return,
978        };
979        let ident = if self.options.parse_options.get_dialect().is_postgresql() {
980            IndexKey {
981                table: None,
982                index: index_name.clone(),
983            }
984        } else {
985            IndexKey {
986                table: Some(t.clone()),
987                index: index_name.clone(),
988            }
989        };
990        if let Some(old) = self.schemas.indices.insert(ident, ci.span())
991            && ci.if_not_exists.is_none()
992        {
993            self.issues
994                .err("Multiple indices with the same identifier", &ci)
995                .frag("Already defined here", &old);
996        }
997    }
998
999    fn process_create_type_enum(&mut self, s: qusql_parse::CreateTypeEnum<'a>) {
1000        let name = unqualified_name(self.issues, &s.name);
1001        let mut replace = false;
1002        for o in &s.create_options {
1003            if matches!(o, qusql_parse::CreateOption::OrReplace(_)) {
1004                replace = true;
1005            }
1006        }
1007        let values = Arc::new(s.values.into_iter().map(|v| v.value).collect());
1008        let typedef = TypeDef::Enum {
1009            values,
1010            span: s.as_enum_span,
1011        };
1012        match self.schemas.types.entry(name.clone()) {
1013            alloc::collections::btree_map::Entry::Occupied(mut e) => {
1014                if replace {
1015                    e.insert(typedef);
1016                }
1017                // Otherwise silently skip - SQL uses EXCEPTION WHEN duplicate_object to handle re-runs
1018            }
1019            alloc::collections::btree_map::Entry::Vacant(e) => {
1020                e.insert(typedef);
1021            }
1022        }
1023    }
1024
1025    fn process_drop_type(&mut self, s: qusql_parse::DropType<'a>) {
1026        let if_exists = s.if_exists;
1027        for name in s.names {
1028            let id = unqualified_name(self.issues, &name);
1029            if self.schemas.types.remove(id).is_none() && if_exists.is_none() {
1030                self.issues.err("Type not found", &name);
1031            }
1032        }
1033    }
1034
1035    fn process_alter_table(&mut self, a: qusql_parse::AlterTable<'a>) {
1036        let e = match self
1037            .schemas
1038            .schemas
1039            .entry(unqualified_name(self.issues, &a.table).clone())
1040        {
1041            alloc::collections::btree_map::Entry::Occupied(e) => {
1042                let e = e.into_mut();
1043                if e.view {
1044                    self.issues.err("Cannot alter view", &a.table);
1045                    return;
1046                }
1047                e
1048            }
1049            alloc::collections::btree_map::Entry::Vacant(_) => {
1050                if a.if_exists.is_none() {
1051                    self.issues.err("Table not found", &a.table);
1052                }
1053                return;
1054            }
1055        };
1056        for s in a.alter_specifications {
1057            process_alter_specification(
1058                s,
1059                e,
1060                &a.table,
1061                self.issues,
1062                &mut self.schemas.indices,
1063                self.options,
1064                &self.schemas.types,
1065            );
1066        }
1067    }
1068}
1069
1070#[allow(clippy::too_many_lines)]
1071fn process_alter_specification<'a>(
1072    s: qusql_parse::AlterSpecification<'a>,
1073    e: &mut Schema<'a>,
1074    table_ref: &qusql_parse::QualifiedName<'a>,
1075    issues: &mut Issues<'a>,
1076    indices: &mut alloc::collections::BTreeMap<IndexKey<'a>, Span>,
1077    options: &TypeOptions,
1078    types: &BTreeMap<Identifier<'a>, TypeDef<'a>>,
1079) {
1080    match s {
1081        qusql_parse::AlterSpecification::AddIndex(AddIndex {
1082            if_not_exists,
1083            name,
1084            cols,
1085            index_type,
1086            ..
1087        }) => {
1088            for col in &cols {
1089                if let qusql_parse::IndexColExpr::Column(cname) = &col.expr
1090                    && e.get_column(cname.value).is_none()
1091                {
1092                    issues
1093                        .err("No such column in table", col)
1094                        .frag("Table defined here", table_ref);
1095                }
1096            }
1097            // PRIMARY KEY implies NOT NULL on each listed column.
1098            if matches!(index_type, qusql_parse::IndexType::Primary(_)) {
1099                for col in &cols {
1100                    if let qusql_parse::IndexColExpr::Column(cname) = &col.expr
1101                        && let Some(c) = e.get_column_mut(cname.value)
1102                    {
1103                        c.type_.not_null = true;
1104                    }
1105                }
1106            }
1107            if let Some(name) = &name {
1108                let ident = if options.parse_options.get_dialect().is_postgresql() {
1109                    IndexKey {
1110                        table: None,
1111                        index: name.clone(),
1112                    }
1113                } else {
1114                    IndexKey {
1115                        table: Some(unqualified_name(issues, table_ref).clone()),
1116                        index: name.clone(),
1117                    }
1118                };
1119                if let Some(old) = indices.insert(ident, name.span())
1120                    && if_not_exists.is_none()
1121                {
1122                    issues
1123                        .err("Multiple indices with the same identifier", &name.span())
1124                        .frag("Already defined here", &old);
1125                }
1126            }
1127        }
1128        qusql_parse::AlterSpecification::AddForeignKey { .. } => {}
1129        qusql_parse::AlterSpecification::Modify(ModifyColumn {
1130            if_exists,
1131            col,
1132            definition,
1133            ..
1134        }) => match e.get_column_mut(col.value) {
1135            Some(c) => {
1136                let new_col = parse_column(
1137                    definition,
1138                    c.identifier.clone(),
1139                    issues,
1140                    Some(options),
1141                    Some(types),
1142                );
1143                *c = new_col;
1144            }
1145            None if if_exists.is_none() => {
1146                issues
1147                    .err("No such column in table", &col)
1148                    .frag("Table defined here", &e.identifier_span);
1149            }
1150            None => {}
1151        },
1152        qusql_parse::AlterSpecification::AddColumn(AddColumn {
1153            identifier,
1154            data_type,
1155            if_not_exists_span,
1156            ..
1157        }) => {
1158            if e.get_column(identifier.value).is_some() {
1159                if if_not_exists_span.is_none() {
1160                    issues
1161                        .err("Column already exists in table", &identifier)
1162                        .frag("Table defined here", &e.identifier_span);
1163                }
1164            } else {
1165                e.columns.push(parse_column(
1166                    data_type,
1167                    identifier,
1168                    issues,
1169                    Some(options),
1170                    Some(types),
1171                ));
1172            }
1173        }
1174        qusql_parse::AlterSpecification::OwnerTo { .. } => {}
1175        qusql_parse::AlterSpecification::DropColumn(DropColumn {
1176            column, if_exists, ..
1177        }) => {
1178            let cnt = e.columns.len();
1179            e.columns.retain(|c| c.identifier != column);
1180            if cnt == e.columns.len() && if_exists.is_none() {
1181                issues
1182                    .err("No such column in table", &column)
1183                    .frag("Table defined here", &e.identifier_span);
1184            }
1185        }
1186        qusql_parse::AlterSpecification::AlterColumn(AlterColumn {
1187            column,
1188            alter_column_action,
1189            ..
1190        }) => match e.get_column_mut(column.value) {
1191            Some(c) => match alter_column_action {
1192                qusql_parse::AlterColumnAction::SetDefault { .. } => c.default = true,
1193                qusql_parse::AlterColumnAction::DropDefault { .. } => c.default = false,
1194                qusql_parse::AlterColumnAction::Type { type_, .. } => {
1195                    *c = parse_column(type_, column, issues, Some(options), Some(types));
1196                }
1197                qusql_parse::AlterColumnAction::SetNotNull { .. } => c.type_.not_null = true,
1198                qusql_parse::AlterColumnAction::DropNotNull { .. } => c.type_.not_null = false,
1199                a @ qusql_parse::AlterColumnAction::AddGenerated { .. } => {
1200                    issues.err("not implemented", &a);
1201                }
1202            },
1203            None => {
1204                issues
1205                    .err("No such column in table", &column)
1206                    .frag("Table defined here", &e.identifier_span);
1207            }
1208        },
1209        qusql_parse::AlterSpecification::DropIndex(drop_idx) => {
1210            let is_postgresql = options.parse_options.get_dialect().is_postgresql();
1211            let key = if is_postgresql {
1212                IndexKey {
1213                    table: None,
1214                    index: drop_idx.name.clone(),
1215                }
1216            } else {
1217                IndexKey {
1218                    table: Some(unqualified_name(issues, table_ref).clone()),
1219                    index: drop_idx.name.clone(),
1220                }
1221            };
1222            if indices.remove(&key).is_none() {
1223                issues.err("No such index to drop", &drop_idx.name);
1224            }
1225        }
1226        qusql_parse::AlterSpecification::RenameColumn(qusql_parse::RenameColumn {
1227            old_col_name,
1228            new_col_name,
1229            ..
1230        }) => match e.get_column_mut(old_col_name.value) {
1231            Some(c) => c.identifier = new_col_name,
1232            None => {
1233                issues
1234                    .err("No such column in table", &old_col_name)
1235                    .frag("Table defined here", &e.identifier_span);
1236            }
1237        },
1238        qusql_parse::AlterSpecification::RenameIndex(qusql_parse::RenameIndex {
1239            old_index_name,
1240            new_index_name,
1241            ..
1242        }) => {
1243            let is_postgresql = options.parse_options.get_dialect().is_postgresql();
1244            let table_id = unqualified_name(issues, table_ref).clone();
1245            let old_key = if is_postgresql {
1246                IndexKey {
1247                    table: None,
1248                    index: old_index_name.clone(),
1249                }
1250            } else {
1251                IndexKey {
1252                    table: Some(table_id.clone()),
1253                    index: old_index_name.clone(),
1254                }
1255            };
1256            match indices.remove(&old_key) {
1257                Some(span) => {
1258                    let new_key = if is_postgresql {
1259                        IndexKey {
1260                            table: None,
1261                            index: new_index_name,
1262                        }
1263                    } else {
1264                        IndexKey {
1265                            table: Some(table_id),
1266                            index: new_index_name,
1267                        }
1268                    };
1269                    indices.insert(new_key, span);
1270                }
1271                None => {
1272                    issues.err("No such index to rename", &old_index_name);
1273                }
1274            }
1275        }
1276        // We do not track constraints, so RENAME CONSTRAINT is a no-op.
1277        qusql_parse::AlterSpecification::RenameConstraint(_) => {}
1278        // RENAME TO inside ALTER TABLE requires renaming the table's map key, which is
1279        // not accessible here.  The standalone RENAME TABLE statement (handled by
1280        // process_rename_table) should be preferred; this variant is left as a no-op.
1281        qusql_parse::AlterSpecification::RenameTo(_) => {}
1282        qusql_parse::AlterSpecification::Change(qusql_parse::Change {
1283            column,
1284            new_column,
1285            definition,
1286            ..
1287        }) => match e.get_column_mut(column.value) {
1288            Some(c) => {
1289                *c = parse_column(definition, new_column, issues, Some(options), Some(types));
1290            }
1291            None => {
1292                issues
1293                    .err("No such column in table", &column)
1294                    .frag("Table defined here", &e.identifier_span);
1295            }
1296        },
1297        qusql_parse::AlterSpecification::Lock { .. }
1298        | qusql_parse::AlterSpecification::DropForeignKey { .. }
1299        | qusql_parse::AlterSpecification::DropPrimaryKey { .. }
1300        | qusql_parse::AlterSpecification::Algorithm { .. }
1301        | qusql_parse::AlterSpecification::AutoIncrement { .. }
1302        | qusql_parse::AlterSpecification::ReplicaIdentity(_)
1303        | qusql_parse::AlterSpecification::ValidateConstraint(_)
1304        | qusql_parse::AlterSpecification::AddTableConstraint(_)
1305        | qusql_parse::AlterSpecification::DisableTrigger(_)
1306        | qusql_parse::AlterSpecification::EnableTrigger(_)
1307        | qusql_parse::AlterSpecification::DisableRule(_)
1308        | qusql_parse::AlterSpecification::EnableRule(_)
1309        | qusql_parse::AlterSpecification::DisableRowLevelSecurity(_)
1310        | qusql_parse::AlterSpecification::EnableRowLevelSecurity(_)
1311        | qusql_parse::AlterSpecification::ForceRowLevelSecurity(_)
1312        | qusql_parse::AlterSpecification::NoForceRowLevelSecurity(_) => {}
1313    }
1314}
1315
1316impl<'a, 'b> SchemaCtx<'a, 'b> {
1317    fn process_drop_table(&mut self, t: qusql_parse::DropTable<'a>) {
1318        for i in t.tables {
1319            match self
1320                .schemas
1321                .schemas
1322                .entry(unqualified_name(self.issues, &i).clone())
1323            {
1324                alloc::collections::btree_map::Entry::Occupied(e) => {
1325                    if e.get().view {
1326                        self.issues
1327                            .err("Name defines a view not a table", &i)
1328                            .frag("View defined here", &e.get().identifier_span);
1329                    } else {
1330                        e.remove();
1331                    }
1332                }
1333                alloc::collections::btree_map::Entry::Vacant(_) => {
1334                    if t.if_exists.is_none() {
1335                        self.issues
1336                            .err("A table with this name does not exist to drop", &i);
1337                    }
1338                }
1339            }
1340        }
1341    }
1342
1343    fn process_drop_view(&mut self, v: qusql_parse::DropView<'a>) {
1344        for i in v.views {
1345            match self
1346                .schemas
1347                .schemas
1348                .entry(unqualified_name(self.issues, &i).clone())
1349            {
1350                alloc::collections::btree_map::Entry::Occupied(e) => {
1351                    if !e.get().view {
1352                        self.issues
1353                            .err("Name defines a table not a view", &i)
1354                            .frag("Table defined here", &e.get().identifier_span);
1355                    } else {
1356                        e.remove();
1357                    }
1358                }
1359                alloc::collections::btree_map::Entry::Vacant(_) => {
1360                    if v.if_exists.is_none() {
1361                        self.issues
1362                            .err("A view with this name does not exist to drop", &i);
1363                    }
1364                }
1365            }
1366        }
1367    }
1368
1369    fn process_drop_function(&mut self, f: qusql_parse::DropFunction<'a>) {
1370        for (func_name, _args) in &f.functions {
1371            match self
1372                .schemas
1373                .functions
1374                .entry(unqualified_name(self.issues, func_name).clone())
1375            {
1376                alloc::collections::btree_map::Entry::Occupied(e) => {
1377                    e.remove();
1378                }
1379                alloc::collections::btree_map::Entry::Vacant(_) => {
1380                    if f.if_exists.is_none() {
1381                        self.issues.err(
1382                            "A function with this name does not exist to drop",
1383                            func_name,
1384                        );
1385                    }
1386                }
1387            }
1388        }
1389    }
1390
1391    fn process_drop_procedure(&mut self, p: qusql_parse::DropProcedure<'a>) {
1392        let name = unqualified_name(self.issues, &p.procedure);
1393        match self.schemas.procedures.entry(name.clone()) {
1394            alloc::collections::btree_map::Entry::Occupied(e) => {
1395                e.remove();
1396            }
1397            alloc::collections::btree_map::Entry::Vacant(_) => {
1398                if p.if_exists.is_none() {
1399                    self.issues.err(
1400                        "A procedure with this name does not exist to drop",
1401                        &p.procedure,
1402                    );
1403                }
1404            }
1405        }
1406    }
1407
1408    fn process_drop_index(&mut self, ci: qusql_parse::DropIndex<'a>) {
1409        let key = IndexKey {
1410            table: ci.on.as_ref().map(|(_, t)| t.identifier.clone()),
1411            index: ci.index_name.clone(),
1412        };
1413        if self.schemas.indices.remove(&key).is_none() && ci.if_exists.is_none() {
1414            self.issues.err("No such index", &ci);
1415        }
1416    }
1417    /// DO $$ ... $$: re-parse the dollar-quoted body and recurse.
1418    fn process_do(&mut self, d: qusql_parse::Do<'a>) -> Result<(), ()> {
1419        match d.body {
1420            qusql_parse::DoBody::Statements(stmts) => self.process_statements(stmts)?,
1421            qusql_parse::DoBody::String(s, _) => {
1422                let span_offset = s.as_ptr() as usize - self.src.as_ptr() as usize;
1423                let body_opts = self
1424                    .options
1425                    .parse_options
1426                    .clone()
1427                    .function_body(true)
1428                    .span_offset(span_offset);
1429                let stmts = parse_statements(s, self.issues, &body_opts);
1430                self.process_statements(stmts)?;
1431            }
1432        }
1433        Ok(())
1434    }
1435
1436    /// IF ... THEN / ELSEIF / ELSE: recurse into all branches.
1437    fn process_if(&mut self, i: qusql_parse::If<'a>) -> Result<(), ()> {
1438        for cond in i.conditions {
1439            if self.eval_condition(&cond.search_condition)? {
1440                return self.process_statements(cond.then);
1441            }
1442        }
1443        if let Some((_, stmts)) = i.else_ {
1444            return self.process_statements(stmts);
1445        }
1446        Ok(())
1447    }
1448
1449    /// SELECT used at the top level of a schema file must be a bare list of
1450    /// function calls with no FROM / WHERE / LIMIT / etc.  Each expression is
1451    /// dispatched to `process_expression`.
1452    fn process_select(&mut self, s: qusql_parse::Select<'a>) -> Result<(), ()> {
1453        // Reject anything that looks like a real query.
1454        if let Some(span) = &s.from_span {
1455            self.issues
1456                .err("SELECT with FROM is not supported at schema level", span);
1457            return Err(());
1458        }
1459        if let Some((_, span)) = &s.where_ {
1460            self.issues
1461                .err("SELECT with WHERE is not supported at schema level", span);
1462            return Err(());
1463        }
1464        if let Some((span, _)) = &s.group_by {
1465            self.issues.err(
1466                "SELECT with GROUP BY is not supported at schema level",
1467                span,
1468            );
1469            return Err(());
1470        }
1471        if let Some((_, span)) = &s.having {
1472            self.issues
1473                .err("SELECT with HAVING is not supported at schema level", span);
1474            return Err(());
1475        }
1476        if let Some((span, _, _)) = &s.limit {
1477            self.issues
1478                .err("SELECT with LIMIT is not supported at schema level", span);
1479            return Err(());
1480        }
1481        if let Some((span, _)) = &s.order_by {
1482            self.issues.err(
1483                "SELECT with ORDER BY is not supported at schema level",
1484                span,
1485            );
1486            return Err(());
1487        }
1488        for se in s.select_exprs {
1489            self.process_expression(se.expr)?;
1490        }
1491        Ok(())
1492    }
1493
1494    fn process_expression(&mut self, expr: Expression<'a>) -> Result<(), ()> {
1495        self.eval_expr(&expr).map(|_| ())
1496    }
1497
1498    fn process_update(&mut self, u: qusql_parse::Update<'a>) -> Result<(), ()> {
1499        // We cannot evaluate SET expressions. Error out if any target table has
1500        // tracked rows whose state would be changed; if there are no tracked
1501        // rows the update has no effect on our model.
1502        let span = u.update_span;
1503        for tref in u.tables {
1504            if let qusql_parse::TableReference::Table { identifier, .. } = tref
1505                && identifier.prefix.is_empty()
1506                && self
1507                    .rows
1508                    .get(identifier.identifier.value)
1509                    .is_some_and(|r| !r.is_empty())
1510            {
1511                self.issues.err(
1512                    "UPDATE on a table with tracked rows is not supported in schema evaluator",
1513                    &span,
1514                );
1515                return Err(());
1516            }
1517        }
1518        Ok(())
1519    }
1520
1521    fn process_delete(&mut self, d: qusql_parse::Delete<'a>) -> Result<(), ()> {
1522        let qusql_parse::Delete {
1523            tables,
1524            using,
1525            where_,
1526            order_by,
1527            limit,
1528            ..
1529        } = d;
1530
1531        // USING / ORDER BY / LIMIT are not supported: error if any target table
1532        // has tracked rows that would be affected.
1533        let has_unsupported = !using.is_empty() || order_by.is_some() || limit.is_some();
1534        if has_unsupported {
1535            for table in &tables {
1536                if table.prefix.is_empty()
1537                    && self
1538                        .rows
1539                        .get(table.identifier.value)
1540                        .is_some_and(|r| !r.is_empty())
1541                {
1542                    self.issues.err(
1543                        "DELETE with USING/ORDER BY/LIMIT on a table with tracked rows \
1544                         is not supported in schema evaluator",
1545                        table,
1546                    );
1547                    return Err(());
1548                }
1549            }
1550        }
1551
1552        if let Some((where_expr, _)) = where_ {
1553            // Evaluate the WHERE for each tracked row; keep rows that do NOT match.
1554            for table in &tables {
1555                if table.prefix.is_empty() {
1556                    let name = table.identifier.value;
1557                    let Some(source_rows) = self.rows.get(name) else {
1558                        continue;
1559                    };
1560                    let source_rows = source_rows.clone();
1561                    let mut new_rows = Vec::new();
1562                    for row in source_rows {
1563                        let saved = self.current_row.replace(row.clone());
1564                        let matches = self.eval_expr(&where_expr).map(|v| v.is_truthy());
1565                        self.current_row = saved;
1566                        match matches {
1567                            Ok(true) => {} // row is deleted
1568                            Ok(false) => new_rows.push(row),
1569                            Err(()) => return Err(()),
1570                        }
1571                    }
1572                    self.rows.insert(name, new_rows);
1573                }
1574            }
1575        } else {
1576            // No WHERE - all rows in every target table are deleted.
1577            for table in tables {
1578                if table.prefix.is_empty() {
1579                    self.rows.remove(table.identifier.value);
1580                }
1581            }
1582        }
1583        Ok(())
1584    }
1585
1586    fn process_alter_type(&mut self, a: qusql_parse::AlterType<'a>) {
1587        let name = unqualified_name(self.issues, &a.name);
1588        match a.action {
1589            qusql_parse::AlterTypeAction::AddValue {
1590                if_not_exists_span,
1591                new_enum_value,
1592                ..
1593            } => {
1594                let Some(TypeDef::Enum { values, .. }) = self.schemas.types.get_mut(name) else {
1595                    self.issues.err("Type not found", &a.name);
1596                    return;
1597                };
1598                let new_val: Cow<'a, str> = new_enum_value.value;
1599                if values.contains(&new_val) {
1600                    if if_not_exists_span.is_none() {
1601                        self.issues.err("Enum value already exists", &a.name);
1602                    }
1603                    // IF NOT EXISTS: silently skip
1604                } else {
1605                    Arc::make_mut(values).push(new_val);
1606                }
1607            }
1608            qusql_parse::AlterTypeAction::RenameTo { new_name, .. } => {
1609                if let Some(typedef) = self.schemas.types.remove(name) {
1610                    self.schemas.types.insert(new_name, typedef);
1611                } else {
1612                    self.issues.err("Type not found", &a.name);
1613                }
1614            }
1615            qusql_parse::AlterTypeAction::RenameValue {
1616                existing_enum_value,
1617                new_enum_value,
1618                ..
1619            } => {
1620                let Some(TypeDef::Enum { values, .. }) = self.schemas.types.get_mut(name) else {
1621                    self.issues.err("Type not found", &a.name);
1622                    return;
1623                };
1624                let old_val: Cow<'a, str> = existing_enum_value.value;
1625                let new_val: Cow<'a, str> = new_enum_value.value;
1626                if let Some(entry) = Arc::make_mut(values).iter_mut().find(|v| **v == old_val) {
1627                    *entry = new_val;
1628                } else {
1629                    self.issues.err("Enum value not found", &a.name);
1630                }
1631            }
1632            // Other ALTER TYPE actions (OWNER TO, SET SCHEMA, etc.) have no effect on
1633            // the parts of the schema we track.
1634            _ => {}
1635        }
1636    }
1637
1638    fn process_truncate_table(&mut self, t: qusql_parse::TruncateTable<'a>) {
1639        for spec in t.tables {
1640            let name = unqualified_name(self.issues, &spec.table_name);
1641            self.rows.remove(name.value);
1642        }
1643    }
1644
1645    fn process_rename_table(&mut self, r: qusql_parse::RenameTable<'a>) {
1646        for pair in r.table_to_tables {
1647            let old_id = unqualified_name(self.issues, &pair.table);
1648            let new_id = unqualified_name(self.issues, &pair.new_table);
1649            // Rename in schemas map.
1650            if let Some(schema) = self.schemas.schemas.remove(old_id) {
1651                self.schemas.schemas.insert(new_id.clone(), schema);
1652            } else {
1653                self.issues.err("Table not found", &pair.table);
1654            }
1655            // Rename tracked rows if present.
1656            if let Some(rows) = self.rows.remove(old_id.value) {
1657                self.rows.insert(new_id.value, rows);
1658            }
1659        }
1660    }
1661
1662    fn process_insert(&mut self, i: qusql_parse::InsertReplace<'a>) -> Result<(), ()> {
1663        // Only unqualified table names are tracked.
1664        let table_name = match i.table.prefix.as_slice() {
1665            [] => i.table.identifier.value,
1666            _ => return Ok(()),
1667        };
1668        let col_names: Vec<&'a str> = i.columns.iter().map(|c| c.value).collect();
1669
1670        if let Some(set) = i.set {
1671            // INSERT ... SET col = expr, ...: evaluate as a single-row VALUES insert.
1672            let mut row: Vec<(&'a str, SqlValue<'a>)> = Vec::new();
1673            for pair in set.pairs {
1674                if let Ok(val) = self.eval_expr(&pair.value) {
1675                    row.push((pair.column.value, val));
1676                }
1677            }
1678            self.rows.entry(table_name).or_default().push(Rc::new(row));
1679            return Ok(());
1680        }
1681
1682        if let Some((_, value_rows)) = i.values {
1683            // INSERT ... VALUES (...)
1684            for row_exprs in value_rows {
1685                let mut row: Vec<(&'a str, SqlValue<'a>)> = Vec::new();
1686                for (col, expr) in col_names.iter().zip(row_exprs.iter()) {
1687                    if let Ok(val) = self.eval_expr(expr) {
1688                        row.push((col, val));
1689                    }
1690                }
1691                self.rows.entry(table_name).or_default().push(Rc::new(row));
1692            }
1693            return Ok(());
1694        }
1695
1696        let Some(select_stmt) = i.select else {
1697            return Ok(());
1698        };
1699        // Clone select expressions before eval borrows `self`.
1700        // Only available for a plain SELECT; compound queries (UNION etc.) produce no
1701        // named expressions to project, so we track rows without column values.
1702        let select_exprs: Vec<_> = if let qusql_parse::Statement::Select(s) = &select_stmt {
1703            s.select_exprs.iter().map(|se| se.expr.clone()).collect()
1704        } else {
1705            Vec::new()
1706        };
1707        let source_rows = self.eval_statement_rows(&select_stmt)?;
1708        for source_row in source_rows {
1709            let saved_row = self.current_row.replace(source_row);
1710            let mut row: Vec<(&'a str, SqlValue<'a>)> = Vec::new();
1711            for (col, expr) in col_names.iter().zip(select_exprs.iter()) {
1712                if let Ok(val) = self.eval_expr(expr) {
1713                    row.push((col, val));
1714                }
1715            }
1716            self.current_row = saved_row;
1717            self.rows.entry(table_name).or_default().push(Rc::new(row));
1718        }
1719        Ok(())
1720    }
1721
1722    /// Evaluate an expression to a `SqlValue`.
1723    /// Reads the current row from `self.current_row` (set by eval_select_matching_rows).
1724    /// Aggregate functions read rows from `self.current_table_rows` (set by eval_condition).
1725    /// Returns `Err(())` for expression types not yet handled by the evaluator.
1726    fn eval_expr(&mut self, expr: &Expression<'a>) -> Result<SqlValue<'a>, ()> {
1727        match expr {
1728            Expression::Null(_) => Ok(SqlValue::Null),
1729            Expression::Bool(b) => Ok(SqlValue::Bool(b.value)),
1730            Expression::Integer(i) => Ok(SqlValue::Integer(i.value as i64)),
1731            Expression::String(s) => Ok(match &s.value {
1732                Cow::Borrowed(b) => SqlValue::SourceText(b),
1733                Cow::Owned(o) => SqlValue::OwnedText(o.clone()),
1734            }),
1735            Expression::Identifier(id) => {
1736                if let [IdentifierPart::Name(name)] = id.parts.as_slice() {
1737                    self.bindings
1738                        .get(name.value)
1739                        .cloned()
1740                        .or_else(|| {
1741                            self.current_row.as_ref().and_then(|r| {
1742                                r.iter()
1743                                    .find(|(k, _)| *k == name.value)
1744                                    .map(|(_, v)| v.clone())
1745                            })
1746                        })
1747                        .ok_or(())
1748                } else {
1749                    Err(())
1750                }
1751            }
1752            Expression::Exists(e) => Ok(SqlValue::Bool(self.eval_exists(&e.subquery)?)),
1753            Expression::Function(f) => self.eval_function_expr(f),
1754            Expression::AggregateFunction(f) => self.eval_aggregate(f),
1755            Expression::Binary(b) => self.eval_binary_expr(b),
1756            _ => {
1757                self.issues
1758                    .err("Unimplemented expression in schema evaluator", expr);
1759                Err(())
1760            }
1761        }
1762    }
1763
1764    fn eval_exists(&mut self, stmt: &qusql_parse::Statement<'a>) -> Result<bool, ()> {
1765        Ok(!self.eval_statement_rows(stmt)?.is_empty())
1766    }
1767
1768    /// Resolve the rows for a SELECT's FROM clause.
1769    /// Returns empty if there is no FROM clause.
1770    /// Errors (with a message) for:
1771    ///   - joins or non-table FROM references (e.g. subqueries in FROM)
1772    ///   - qualified table names (e.g. `information_schema.columns`)
1773    ///   - unqualified table names not known to the schema evaluator
1774    fn resolve_from_rows(&mut self, s: &qusql_parse::Select<'a>) -> Result<Vec<Row<'a>>, ()> {
1775        use qusql_parse::TableReference;
1776        let Some(refs) = s.table_references.as_deref() else {
1777            return Ok(Vec::new());
1778        };
1779        let [TableReference::Table { identifier, .. }] = refs else {
1780            self.issues.err(
1781                "FROM clause with joins or subqueries is not supported in schema evaluator",
1782                &refs[0],
1783            );
1784            return Err(());
1785        };
1786        if !identifier.prefix.is_empty() {
1787            // Synthesize information_schema.columns from the current schema state.
1788            if identifier.prefix.len() == 1
1789                && identifier.prefix[0]
1790                    .0
1791                    .value
1792                    .eq_ignore_ascii_case("information_schema")
1793                && identifier.identifier.value.eq_ignore_ascii_case("columns")
1794            {
1795                let rows = self
1796                    .schemas
1797                    .schemas
1798                    .iter()
1799                    .flat_map(|(table_id, schema)| {
1800                        schema.columns.iter().map(move |col| {
1801                            Rc::new(alloc::vec![
1802                                ("table_name", SqlValue::SourceText(table_id.value)),
1803                                ("column_name", SqlValue::SourceText(col.identifier.value)),
1804                            ])
1805                        })
1806                    })
1807                    .collect();
1808                return Ok(rows);
1809            }
1810            self.issues.err(
1811                "Qualified table name in FROM clause is not supported in schema evaluator",
1812                identifier,
1813            );
1814            return Err(());
1815        }
1816        let name = identifier.identifier.value;
1817        let known =
1818            self.rows.contains_key(name) || self.schemas.schemas.keys().any(|k| k.value == name);
1819        if !known {
1820            self.issues.err(
1821                alloc::format!("Unknown table `{name}` referenced in schema evaluator"),
1822                &identifier.identifier,
1823            );
1824            return Err(());
1825        }
1826        Ok(self.rows.get(name).cloned().unwrap_or_default())
1827    }
1828
1829    /// Evaluate any SELECT-like statement (plain SELECT or UNION/INTERSECT/EXCEPT compound
1830    /// query) to a list of result rows.
1831    fn eval_statement_rows(
1832        &mut self,
1833        stmt: &qusql_parse::Statement<'a>,
1834    ) -> Result<Vec<Row<'a>>, ()> {
1835        match stmt {
1836            qusql_parse::Statement::Select(s) => self.eval_select_matching_rows(s),
1837            qusql_parse::Statement::CompoundQuery(cq) => self.eval_compound_query_rows(cq),
1838            _ => {
1839                self.issues
1840                    .err("Unsupported statement kind in INSERT ... SELECT", stmt);
1841                Err(())
1842            }
1843        }
1844    }
1845
1846    /// Evaluate a UNION / INTERSECT / EXCEPT compound query to rows.
1847    /// Only UNION (ALL or deduplicated) is supported; INTERSECT and EXCEPT error out.
1848    fn eval_compound_query_rows(
1849        &mut self,
1850        cq: &qusql_parse::CompoundQuery<'a>,
1851    ) -> Result<Vec<Row<'a>>, ()> {
1852        use qusql_parse::CompoundOperator;
1853        let mut result = self.eval_statement_rows(&cq.left)?;
1854        for branch in &cq.with {
1855            match branch.operator {
1856                CompoundOperator::Union => {
1857                    let branch_rows = self.eval_statement_rows(&branch.statement)?;
1858                    result.extend(branch_rows);
1859                }
1860                CompoundOperator::Intersect | CompoundOperator::Except => {
1861                    self.issues.err(
1862                        "INTERSECT / EXCEPT is not supported in schema evaluator",
1863                        &branch.operator_span,
1864                    );
1865                    return Err(());
1866                }
1867            }
1868        }
1869        Ok(result)
1870    }
1871
1872    /// Return the rows from the single table in a SELECT that satisfy the WHERE clause.
1873    fn eval_select_matching_rows(
1874        &mut self,
1875        s: &qusql_parse::Select<'a>,
1876    ) -> Result<Vec<Row<'a>>, ()> {
1877        let source_rows = self.resolve_from_rows(s)?;
1878        let where_expr: Option<Expression<'a>> = s.where_.as_ref().map(|(e, _)| e.clone());
1879        let mut result = Vec::new();
1880        for row in source_rows {
1881            let saved_row = self.current_row.replace(row.clone());
1882            let eval_result = match &where_expr {
1883                Some(expr) => self.eval_expr(expr).map(|v| v.is_truthy()),
1884                None => Ok(true),
1885            };
1886            self.current_row = saved_row;
1887            if eval_result? {
1888                result.push(row);
1889            }
1890        }
1891        Ok(result)
1892    }
1893
1894    fn eval_function_expr(
1895        &mut self,
1896        f: &qusql_parse::FunctionCallExpression<'a>,
1897    ) -> Result<SqlValue<'a>, ()> {
1898        use qusql_parse::Function;
1899        match &f.function {
1900            Function::Other(parts) if parts.len() == 1 => {
1901                let func_name = parts[0].value;
1902                let func_info = self
1903                    .schemas
1904                    .functions
1905                    .values()
1906                    .find(|func| func.name.value == func_name)
1907                    .and_then(|func| {
1908                        func.body
1909                            .as_ref()
1910                            .map(|b| (func.params.clone(), b.statements.clone()))
1911                    });
1912                let Some((params, statements)) = func_info else {
1913                    self.issues.err(
1914                        alloc::format!(
1915                            "Unknown function or function has no evaluable body: {func_name}"
1916                        ),
1917                        f,
1918                    );
1919                    return Err(());
1920                };
1921                let mut bindings = BTreeMap::new();
1922                for (param, arg) in params.iter().zip(f.args.iter()) {
1923                    let Some(name) = &param.name else { continue };
1924                    if let Ok(value) = self.eval_expr(arg) {
1925                        bindings.insert(name.value, value);
1926                    }
1927                }
1928                let old_bindings = core::mem::replace(&mut self.bindings, bindings);
1929                let old_return = self.return_value.take();
1930                let _ = self.process_statements(statements);
1931                let ret = self.return_value.take().unwrap_or(SqlValue::Null);
1932                self.return_value = old_return;
1933                self.bindings = old_bindings;
1934                Ok(ret)
1935            }
1936            Function::Coalesce => {
1937                // Clone args to avoid borrow conflict between &f and &mut self.
1938                let args: Vec<_> = f.args.clone();
1939                for arg in &args {
1940                    let v = self.eval_expr(arg)?;
1941                    if v != SqlValue::Null {
1942                        return Ok(v);
1943                    }
1944                }
1945                Ok(SqlValue::Null)
1946            }
1947            Function::Exists => {
1948                let Some(Expression::Subquery(sq)) = f.args.first() else {
1949                    self.issues.err("EXISTS without subquery argument", f);
1950                    return Err(());
1951                };
1952                let qusql_parse::Statement::Select(s) = &sq.expression else {
1953                    self.issues.err("EXISTS argument is not a SELECT", f);
1954                    return Err(());
1955                };
1956                let s = s.clone();
1957                Ok(SqlValue::Bool(
1958                    !self.eval_select_matching_rows(&s)?.is_empty(),
1959                ))
1960            }
1961            _ => {
1962                self.issues
1963                    .err("Unimplemented function in schema evaluator", f);
1964                Err(())
1965            }
1966        }
1967    }
1968
1969    fn eval_aggregate(
1970        &mut self,
1971        f: &qusql_parse::AggregateFunctionCallExpression<'a>,
1972    ) -> Result<SqlValue<'a>, ()> {
1973        use qusql_parse::Function;
1974        match &f.function {
1975            Function::Max => {
1976                let col_expr = f.args.first().ok_or(())?.clone();
1977                // Take rows out so we can call &mut self methods during iteration.
1978                let rows = core::mem::take(&mut self.current_table_rows);
1979                let mut max: Option<SqlValue<'a>> = None;
1980                for r in &rows {
1981                    // Set current_row so column references in the expression resolve correctly.
1982                    let saved = self.current_row.replace(r.clone());
1983                    // Skip rows where evaluation fails (NULL semantics for aggregates).
1984                    let v = self.eval_expr(&col_expr);
1985                    self.current_row = saved;
1986                    if let Ok(v) = v
1987                        && v != SqlValue::Null
1988                    {
1989                        max = Some(match max {
1990                            None => v,
1991                            Some(SqlValue::Integer(m)) => {
1992                                if let SqlValue::Integer(n) = &v {
1993                                    SqlValue::Integer(m.max(*n))
1994                                } else {
1995                                    v
1996                                }
1997                            }
1998                            Some(existing) => existing,
1999                        });
2000                    }
2001                }
2002                self.current_table_rows = rows;
2003                Ok(max.unwrap_or(SqlValue::Null))
2004            }
2005            Function::Count => {
2006                let rows = core::mem::take(&mut self.current_table_rows);
2007                let is_star = matches!(
2008                    f.args.first(),
2009                    Some(Expression::Identifier(ie))
2010                        if matches!(ie.parts.as_slice(), [IdentifierPart::Star(_)])
2011                );
2012                let count = if f.args.is_empty() || is_star {
2013                    // COUNT(*) or COUNT() - count all rows
2014                    rows.len() as i64
2015                } else {
2016                    let col_expr = f.args.first().unwrap().clone();
2017                    let mut n = 0i64;
2018                    for r in &rows {
2019                        let saved = self.current_row.replace(r.clone());
2020                        let v = self.eval_expr(&col_expr);
2021                        self.current_row = saved;
2022                        if matches!(v, Ok(v) if v != SqlValue::Null) {
2023                            n += 1;
2024                        }
2025                    }
2026                    n
2027                };
2028                self.current_table_rows = rows;
2029                Ok(SqlValue::Integer(count))
2030            }
2031            _ => {
2032                self.issues
2033                    .err("Unimplemented aggregate function in schema evaluator", f);
2034                Err(())
2035            }
2036        }
2037    }
2038
2039    fn eval_binary_expr(
2040        &mut self,
2041        b: &qusql_parse::BinaryExpression<'a>,
2042    ) -> Result<SqlValue<'a>, ()> {
2043        use qusql_parse::BinaryOperator;
2044        let lhs = self.eval_expr(&b.lhs)?;
2045        // Short-circuit AND/OR before evaluating rhs.
2046        match &b.op {
2047            BinaryOperator::And(_) => {
2048                if !lhs.is_truthy() {
2049                    return Ok(SqlValue::Bool(false));
2050                }
2051                return Ok(SqlValue::Bool(self.eval_expr(&b.rhs)?.is_truthy()));
2052            }
2053            BinaryOperator::Or(_) => {
2054                if lhs.is_truthy() {
2055                    return Ok(SqlValue::Bool(true));
2056                }
2057                return Ok(SqlValue::Bool(self.eval_expr(&b.rhs)?.is_truthy()));
2058            }
2059            _ => {}
2060        }
2061        let rhs = self.eval_expr(&b.rhs)?;
2062        // NULL comparisons propagate NULL (not an error).
2063        Ok(match &b.op {
2064            BinaryOperator::Eq(_) => lhs.sql_eq(&rhs).map_or(SqlValue::Null, SqlValue::Bool),
2065            BinaryOperator::Neq(_) => lhs
2066                .sql_eq(&rhs)
2067                .map_or(SqlValue::Null, |v| SqlValue::Bool(!v)),
2068            BinaryOperator::LtEq(_) => lhs.sql_lte(&rhs).map_or(SqlValue::Null, SqlValue::Bool),
2069            BinaryOperator::Lt(_) => rhs
2070                .sql_lte(&lhs)
2071                .map_or(SqlValue::Null, |v| SqlValue::Bool(!v)),
2072            BinaryOperator::GtEq(_) => rhs.sql_lte(&lhs).map_or(SqlValue::Null, SqlValue::Bool),
2073            BinaryOperator::Gt(_) => lhs
2074                .sql_lte(&rhs)
2075                .map_or(SqlValue::Null, |v| SqlValue::Bool(!v)),
2076            _ => {
2077                self.issues
2078                    .err("Unimplemented binary operator in schema evaluator", b);
2079                return Err(());
2080            }
2081        })
2082    }
2083
2084    /// Evaluate the search condition of an IF branch as a boolean.
2085    /// Sets `self.current_table_rows` from the FROM clause (if any) so that
2086    /// aggregate expressions in the condition can read the table rows.
2087    /// Returns `Err(())` if the condition uses constructs the evaluator does not handle.
2088    fn eval_condition(&mut self, s: &qusql_parse::Select<'a>) -> Result<bool, ()> {
2089        let expr = s.select_exprs.first().map(|se| se.expr.clone()).ok_or(())?;
2090
2091        // Load the FROM table's rows into current_table_rows for aggregate evaluation.
2092        let table_rows = self.resolve_from_rows(s)?;
2093        let saved = core::mem::replace(&mut self.current_table_rows, table_rows);
2094        let result = self.eval_expr(&expr);
2095        self.current_table_rows = saved;
2096
2097        Ok(result?.is_truthy())
2098    }
2099
2100    fn process_plpgsql_execute(&mut self, e: qusql_parse::PlpgsqlExecute<'a>) -> Result<(), ()> {
2101        let sql = self.resolve_expr_to_bound_string(&e.command);
2102        let Some(sql) = sql else {
2103            self.issues.err(
2104                "EXECUTE argument could not be resolved to a known SQL string",
2105                &e,
2106            );
2107            return Err(());
2108        };
2109        let span_offset = sql.as_ptr() as usize - self.src.as_ptr() as usize;
2110        let opts = self.options.parse_options.clone().span_offset(span_offset);
2111        let stmts = parse_statements(sql, self.issues, &opts);
2112        let _ = self.process_statements(stmts);
2113        Ok(())
2114    }
2115
2116    /// Try to resolve an expression to a `&'a str` from the current bindings.
2117    /// Only succeeds for bare identifier expressions that name a bound parameter
2118    /// whose value is a borrow from the original source text.
2119    fn resolve_expr_to_bound_string(&self, expr: &Expression<'a>) -> Option<&'a str> {
2120        let Expression::Identifier(ident) = expr else {
2121            return None;
2122        };
2123        let [IdentifierPart::Name(name)] = ident.parts.as_slice() else {
2124            return None;
2125        };
2126        self.bindings
2127            .get(name.value)
2128            .and_then(|v| v.as_source_text())
2129    }
2130}
2131
2132/// Parse a schema definition and return a terse description
2133///
2134/// Errors and warnings are added to issues. The schema is successfully
2135/// parsed if no errors are added to issues.
2136/// Built-in table/view definitions that are automatically available in
2137/// PostgreSQL and PostGIS databases.  These are injected into every
2138/// `Schemas` produced by [`parse_schemas`] when the dialect is
2139/// [`SQLDialect::PostgreSQL`] or [`SQLDialect::PostGIS`].
2140const POSTGRESQL_BUILTIN_SQL: &str = "
2141CREATE TABLE spatial_ref_sys (
2142    srid INTEGER NOT NULL,
2143    auth_name VARCHAR(256),
2144    auth_srid INTEGER,
2145    srtext VARCHAR(2048),
2146    proj4text VARCHAR(2048)
2147);
2148CREATE VIEW geometry_columns AS (
2149    SELECT '' AS f_table_catalog, '' AS f_table_schema, '' AS f_table_name,
2150           '' AS f_geometry_column, 0 AS coord_dimension, 0 AS srid, '' AS type
2151);
2152CREATE VIEW geography_columns AS (
2153    SELECT '' AS f_table_catalog, '' AS f_table_schema, '' AS f_table_name,
2154           '' AS f_geography_column, 0 AS coord_dimension, 0 AS srid, '' AS type
2155);
2156";
2157
2158pub fn parse_schemas<'a>(
2159    src: &'a str,
2160    issues: &mut Issues<'a>,
2161    options: &TypeOptions,
2162) -> Schemas<'a> {
2163    let statements = parse_statements(src, issues, &options.parse_options);
2164
2165    let mut schemas = Schemas {
2166        schemas: Default::default(),
2167        procedures: Default::default(),
2168        functions: Default::default(),
2169        indices: Default::default(),
2170        types: Default::default(),
2171    };
2172
2173    SchemaCtx::new(&mut schemas, issues, src, options).process_top_level_statements(statements);
2174
2175    let dummy_schemas = Schemas::default();
2176
2177    let mut typer = crate::typer::Typer {
2178        schemas: &dummy_schemas,
2179        issues,
2180        reference_types: Vec::new(),
2181        outer_reference_types: Vec::new(),
2182        arg_types: Default::default(),
2183        options,
2184        with_schemas: Default::default(),
2185    };
2186
2187    // Compute nullity of generated columns
2188    for (name, schema) in &mut schemas.schemas {
2189        if schema.columns.iter().all(|v| v.as_.is_none()) {
2190            continue;
2191        }
2192        typer.reference_types.clear();
2193        let mut columns = Vec::new();
2194        for c in &schema.columns {
2195            columns.push((c.identifier.clone(), c.type_.clone()));
2196        }
2197        typer.reference_types.push(crate::typer::ReferenceType {
2198            name: Some(name.clone()),
2199            span: schema.identifier_span.clone(),
2200            columns,
2201        });
2202        for c in &mut schema.columns {
2203            if let Some(as_) = &c.as_ {
2204                let full_type = crate::type_expression::type_expression(
2205                    &mut typer,
2206                    as_,
2207                    crate::type_expression::ExpressionFlags::default(),
2208                    BaseType::Any,
2209                );
2210                c.type_.not_null = full_type.not_null;
2211            }
2212        }
2213    }
2214
2215    // Inject dialect-specific built-in schemas so that system tables like
2216    // `spatial_ref_sys` are always resolvable without requiring the user to
2217    // declare them in their schema file.
2218    let dialect = options.parse_options.get_dialect();
2219    if dialect.is_postgresql() {
2220        // Coerce 'static to &'a str — sound because 'static: 'a.
2221        let builtin_src: &'a str = POSTGRESQL_BUILTIN_SQL;
2222        let builtin_options = TypeOptions::new().dialect(dialect);
2223        let builtin_stmts = parse_statements(
2224            builtin_src,
2225            &mut Issues::new(builtin_src),
2226            &builtin_options.parse_options,
2227        );
2228        let mut builtin_schemas: Schemas<'a> = Schemas::default();
2229        SchemaCtx::new(
2230            &mut builtin_schemas,
2231            &mut Issues::new(builtin_src),
2232            builtin_src,
2233            &builtin_options,
2234        )
2235        .process_top_level_statements(builtin_stmts);
2236        // User-defined tables take priority; only add entries not already present.
2237        for (k, v) in builtin_schemas.schemas {
2238            schemas.schemas.entry(k).or_insert(v);
2239        }
2240    }
2241
2242    schemas
2243}