Skip to main content

reddb_server/storage/query/
sql.rs

1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3    AlterMetricQuery, AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery,
4    AskQuery, BinOp, CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
5    CreateForeignTableQuery, CreateIndexQuery, CreateMetricQuery, CreateMigrationQuery,
6    CreatePolicyQuery, CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery,
7    CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery, CreateVectorQuery,
8    CreateViewQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropForeignTableQuery,
9    DropGraphQuery, DropIndexQuery, DropKvQuery, DropPolicyQuery, DropQueueQuery, DropSchemaQuery,
10    DropSequenceQuery, DropServerQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery,
11    DropVectorQuery, DropViewQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainMigrationQuery,
12    Expr, FieldRef, Filter, ForeignColumnDef, GrantStmt, GraphCommand, GraphQuery, HybridQuery,
13    InsertQuery, JoinQuery, KvCommand, MaintenanceCommand, PathQuery, PolicyAction,
14    ProbabilisticCommand, QueryExpr, QueueCommand, QueueSelectQuery, RefreshMaterializedViewQuery,
15    RevokeStmt, RollbackMigrationQuery, SearchCommand, Span, TableQuery, TreeCommand,
16    TruncateQuery, TxnControl, UpdateQuery, VectorQuery,
17};
18use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
19use crate::storage::query::sql_lowering::filter_to_expr;
20use crate::storage::query::Token;
21use crate::storage::schema::Value;
22
23/// Canonical SQL frontend command surface.
24///
25/// This is the single entrypoint for SQL/RQL-style commands before they are
26/// lowered into the broader multi-backend `QueryExpr` space.
27#[derive(Debug, Clone)]
28pub enum SqlStatement {
29    Query(SqlQuery),
30    Mutation(SqlMutation),
31    Schema(SqlSchemaCommand),
32    Admin(SqlAdminCommand),
33}
34
35#[derive(Debug, Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum FrontendStatement {
38    Sql(SqlStatement),
39    Graph(GraphQuery),
40    GraphCommand(GraphCommand),
41    Path(PathQuery),
42    Vector(VectorQuery),
43    Hybrid(HybridQuery),
44    Search(SearchCommand),
45    Ask(AskQuery),
46    QueueSelect(QueueSelectQuery),
47    QueueCommand(QueueCommand),
48    EventsBackfill(EventsBackfillQuery),
49    EventsBackfillStatus { collection: String },
50    TreeCommand(TreeCommand),
51    ProbabilisticCommand(ProbabilisticCommand),
52    KvCommand(KvCommand),
53    ConfigCommand(ConfigCommand),
54}
55
56#[derive(Debug, Clone)]
57pub enum SqlCommand {
58    Select(TableQuery),
59    Join(JoinQuery),
60    Insert(InsertQuery),
61    Update(UpdateQuery),
62    Delete(DeleteQuery),
63    ExplainAlter(ExplainAlterQuery),
64    CreateTable(CreateTableQuery),
65    CreateCollection(CreateCollectionQuery),
66    CreateVector(CreateVectorQuery),
67    DropTable(DropTableQuery),
68    DropGraph(DropGraphQuery),
69    DropVector(DropVectorQuery),
70    DropDocument(DropDocumentQuery),
71    DropKv(DropKvQuery),
72    DropCollection(DropCollectionQuery),
73    Truncate(TruncateQuery),
74    AlterTable(AlterTableQuery),
75    CreateIndex(CreateIndexQuery),
76    DropIndex(DropIndexQuery),
77    CreateTimeSeries(CreateTimeSeriesQuery),
78    CreateMetric(CreateMetricQuery),
79    AlterMetric(AlterMetricQuery),
80    CreateSlo(CreateSloQuery),
81    DropTimeSeries(DropTimeSeriesQuery),
82    CreateQueue(CreateQueueQuery),
83    AlterQueue(AlterQueueQuery),
84    DropQueue(DropQueueQuery),
85    CreateTree(CreateTreeQuery),
86    DropTree(DropTreeQuery),
87    Probabilistic(ProbabilisticCommand),
88    SetConfig {
89        key: String,
90        value: Value,
91    },
92    ShowConfig {
93        prefix: Option<String>,
94    },
95    SetSecret {
96        key: String,
97        value: Value,
98    },
99    DeleteSecret {
100        key: String,
101    },
102    ShowSecrets {
103        prefix: Option<String>,
104    },
105    SetTenant(Option<String>),
106    ShowTenant,
107    TransactionControl(TxnControl),
108    Maintenance(MaintenanceCommand),
109    CreateSchema(CreateSchemaQuery),
110    DropSchema(DropSchemaQuery),
111    CreateSequence(CreateSequenceQuery),
112    DropSequence(DropSequenceQuery),
113    CopyFrom(CopyFromQuery),
114    CreateView(CreateViewQuery),
115    DropView(DropViewQuery),
116    RefreshMaterializedView(RefreshMaterializedViewQuery),
117    CreatePolicy(CreatePolicyQuery),
118    DropPolicy(DropPolicyQuery),
119    CreateServer(CreateServerQuery),
120    DropServer(DropServerQuery),
121    CreateForeignTable(CreateForeignTableQuery),
122    DropForeignTable(DropForeignTableQuery),
123    /// `GRANT … ON … TO …`
124    Grant(GrantStmt),
125    /// `REVOKE … ON … FROM …`
126    Revoke(RevokeStmt),
127    /// `ALTER USER name <attrs>`
128    AlterUser(AlterUserStmt),
129    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
130    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
131    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
132    /// can route the multitude of shapes through a single arm.
133    IamPolicy(QueryExpr),
134    CreateMigration(CreateMigrationQuery),
135    ApplyMigration(ApplyMigrationQuery),
136    RollbackMigration(RollbackMigrationQuery),
137    ExplainMigration(ExplainMigrationQuery),
138}
139
140/// Issue #789 — Analytics v0 non-goal map for `CREATE …` forms.
141///
142/// PRD #782 ringfences Analytics v0 around a metric-centric catalog and
143/// explicitly excludes generic analytics objects, a separate event
144/// storage model, cohorts, funnels, SLA contracts, and adapter surfaces.
145/// When the parser sees one of these idents in the `CREATE` head, return
146/// a stable v0-scoped rejection message; otherwise return `None` and let
147/// the regular CREATE fallback handle the token.
148fn analytics_v0_non_goal_create(token: &Token) -> Option<String> {
149    let ident = match token {
150        Token::Ident(s) => s,
151        _ => return None,
152    };
153    let upper = ident.to_ascii_uppercase();
154    let message = match upper.as_str() {
155        "ANALYTICS" => {
156            "CREATE ANALYTICS is not supported in Analytics v0 — \
157             use CREATE METRIC <dotted.path> for the metric-centric \
158             catalog (PRD #782 non-goal)"
159        }
160        "EVENT" => {
161            "CREATE EVENT is not supported in Analytics v0 — \
162             event-shaped data lives in ordinary TABLE/DOCUMENT \
163             collections, not a new storage model (PRD #782 non-goal)"
164        }
165        "COHORT" => {
166            "CREATE COHORT is not supported in Analytics v0 — \
167             cohort surfaces are deferred (PRD #782 non-goal)"
168        }
169        "FUNNEL" => {
170            "CREATE FUNNEL is not supported in Analytics v0 — \
171             funnel surfaces are deferred (PRD #782 non-goal)"
172        }
173        "SLA" => {
174            "CREATE SLA is not supported in Analytics v0 — \
175             SLA/legal/commercial contract modeling is post-MVP \
176             (PRD #782 non-goal)"
177        }
178        "ADAPTER" => {
179            "CREATE ADAPTER is not supported in Analytics v0 — \
180             Prometheus/Grafana/Snowplow/Google Analytics adapters \
181             are deferred (PRD #782 non-goal)"
182        }
183        _ => return None,
184    };
185    Some(message.to_string())
186}
187
188fn collection_model_filter(model: &str) -> Filter {
189    Filter::Compare {
190        field: FieldRef::column("", "model"),
191        op: CompareOp::Eq,
192        value: Value::Text(model.to_string().into()),
193    }
194}
195
196fn add_table_filter(query: &mut TableQuery, filter: Filter) {
197    let combined = match query.filter.take() {
198        Some(existing) => existing.and(filter),
199        None => filter,
200    };
201    query.where_expr = Some(filter_to_expr(&combined));
202    query.filter = Some(combined);
203}
204
205fn parse_show_collections_by_model(
206    parser: &mut Parser<'_>,
207    model: &str,
208) -> Result<TableQuery, ParseError> {
209    let mut query = TableQuery::new("red.collections");
210    parser.parse_table_clauses(&mut query)?;
211    add_table_filter(&mut query, collection_model_filter(model));
212    Ok(query)
213}
214
215#[derive(Debug, Clone)]
216#[allow(clippy::large_enum_variant)]
217pub enum SqlQuery {
218    Select(TableQuery),
219    Join(JoinQuery),
220}
221
222#[derive(Debug, Clone)]
223pub enum SqlMutation {
224    Insert(InsertQuery),
225    Update(UpdateQuery),
226    Delete(DeleteQuery),
227}
228
229#[derive(Debug, Clone)]
230pub enum SqlSchemaCommand {
231    ExplainAlter(ExplainAlterQuery),
232    CreateTable(CreateTableQuery),
233    CreateCollection(CreateCollectionQuery),
234    CreateVector(CreateVectorQuery),
235    DropTable(DropTableQuery),
236    DropGraph(DropGraphQuery),
237    DropVector(DropVectorQuery),
238    DropDocument(DropDocumentQuery),
239    DropKv(DropKvQuery),
240    DropCollection(DropCollectionQuery),
241    Truncate(TruncateQuery),
242    AlterTable(AlterTableQuery),
243    CreateIndex(CreateIndexQuery),
244    DropIndex(DropIndexQuery),
245    CreateTimeSeries(CreateTimeSeriesQuery),
246    CreateMetric(CreateMetricQuery),
247    AlterMetric(AlterMetricQuery),
248    CreateSlo(CreateSloQuery),
249    DropTimeSeries(DropTimeSeriesQuery),
250    CreateQueue(CreateQueueQuery),
251    AlterQueue(AlterQueueQuery),
252    DropQueue(DropQueueQuery),
253    CreateTree(CreateTreeQuery),
254    DropTree(DropTreeQuery),
255    Probabilistic(ProbabilisticCommand),
256    CreateSchema(CreateSchemaQuery),
257    DropSchema(DropSchemaQuery),
258    CreateSequence(CreateSequenceQuery),
259    DropSequence(DropSequenceQuery),
260    CopyFrom(CopyFromQuery),
261    CreateView(CreateViewQuery),
262    DropView(DropViewQuery),
263    RefreshMaterializedView(RefreshMaterializedViewQuery),
264    CreatePolicy(CreatePolicyQuery),
265    DropPolicy(DropPolicyQuery),
266    CreateServer(CreateServerQuery),
267    DropServer(DropServerQuery),
268    CreateForeignTable(CreateForeignTableQuery),
269    DropForeignTable(DropForeignTableQuery),
270    CreateMigration(CreateMigrationQuery),
271    ApplyMigration(ApplyMigrationQuery),
272    RollbackMigration(RollbackMigrationQuery),
273    ExplainMigration(ExplainMigrationQuery),
274}
275
276#[derive(Debug, Clone)]
277#[allow(clippy::large_enum_variant)]
278pub enum SqlAdminCommand {
279    SetConfig { key: String, value: Value },
280    ShowConfig { prefix: Option<String> },
281    SetSecret { key: String, value: Value },
282    DeleteSecret { key: String },
283    ShowSecrets { prefix: Option<String> },
284    SetTenant(Option<String>),
285    ShowTenant,
286    TransactionControl(TxnControl),
287    Maintenance(MaintenanceCommand),
288    Grant(GrantStmt),
289    Revoke(RevokeStmt),
290    AlterUser(AlterUserStmt),
291    IamPolicy(QueryExpr),
292}
293
294impl SqlStatement {
295    pub fn into_command(self) -> SqlCommand {
296        match self {
297            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
298            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
299            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
300            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
301            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
302            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
303                SqlCommand::ExplainAlter(query)
304            }
305            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
306                SqlCommand::CreateTable(query)
307            }
308            SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
309                SqlCommand::CreateCollection(query)
310            }
311            SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
312                SqlCommand::CreateVector(query)
313            }
314            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
315                SqlCommand::DropTable(query)
316            }
317            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
318                SqlCommand::DropGraph(query)
319            }
320            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
321                SqlCommand::DropVector(query)
322            }
323            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
324                SqlCommand::DropDocument(query)
325            }
326            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
327            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
328                SqlCommand::DropCollection(query)
329            }
330            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
331            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
332                SqlCommand::AlterTable(query)
333            }
334            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
335                SqlCommand::CreateIndex(query)
336            }
337            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
338                SqlCommand::DropIndex(query)
339            }
340            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
341                SqlCommand::CreateTimeSeries(query)
342            }
343            SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query)) => {
344                SqlCommand::CreateMetric(query)
345            }
346            SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query)) => {
347                SqlCommand::AlterMetric(query)
348            }
349            SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query)) => {
350                SqlCommand::CreateSlo(query)
351            }
352            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
353                SqlCommand::DropTimeSeries(query)
354            }
355            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
356                SqlCommand::CreateQueue(query)
357            }
358            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
359                SqlCommand::AlterQueue(query)
360            }
361            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
362                SqlCommand::DropQueue(query)
363            }
364            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
365                SqlCommand::CreateTree(query)
366            }
367            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
368            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
369                SqlCommand::Probabilistic(command)
370            }
371            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
372                SqlCommand::SetConfig { key, value }
373            }
374            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
375                SqlCommand::ShowConfig { prefix }
376            }
377            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
378                SqlCommand::SetSecret { key, value }
379            }
380            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
381                SqlCommand::DeleteSecret { key }
382            }
383            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
384                SqlCommand::ShowSecrets { prefix }
385            }
386            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
387            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
388            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
389                SqlCommand::TransactionControl(ctl)
390            }
391            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
392            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
393            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
394            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
395                SqlCommand::CreateSequence(q)
396            }
397            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
398            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
399            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
400            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
401            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
402                SqlCommand::RefreshMaterializedView(q)
403            }
404            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
405            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
406            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
407            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
408            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
409                SqlCommand::CreateForeignTable(q)
410            }
411            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
412                SqlCommand::DropForeignTable(q)
413            }
414            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
415            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
416            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
417            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
418            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
419                SqlCommand::CreateMigration(q)
420            }
421            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
422                SqlCommand::ApplyMigration(q)
423            }
424            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
425                SqlCommand::RollbackMigration(q)
426            }
427            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
428                SqlCommand::ExplainMigration(q)
429            }
430        }
431    }
432
433    pub fn into_query_expr(self) -> QueryExpr {
434        self.into_command().into_query_expr()
435    }
436}
437
438impl FrontendStatement {
439    pub fn into_query_expr(self) -> QueryExpr {
440        match self {
441            FrontendStatement::Sql(statement) => statement.into_query_expr(),
442            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
443            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
444            FrontendStatement::Path(query) => QueryExpr::Path(query),
445            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
446            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
447            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
448            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
449            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
450            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
451            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
452            FrontendStatement::EventsBackfillStatus { collection } => {
453                QueryExpr::EventsBackfillStatus { collection }
454            }
455            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
456            FrontendStatement::ProbabilisticCommand(command) => {
457                QueryExpr::ProbabilisticCommand(command)
458            }
459            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
460            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
461        }
462    }
463}
464
465pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
466    let mut parser = Parser::new(input)?;
467    let statement = parser.parse_frontend_statement()?;
468    if !parser.check(&Token::Eof) {
469        return Err(ParseError::new(
470            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
471            // Display arms emit raw user bytes. Render via `{:?}` so
472            // embedded CR/LF/NUL/quotes are escaped before the message
473            // reaches downstream JSON / audit / log / gRPC sinks.
474            format!("Unexpected token after query: {:?}", parser.current.token),
475            parser.position(),
476        ));
477    }
478    Ok(statement)
479}
480
481impl SqlCommand {
482    pub fn into_query_expr(self) -> QueryExpr {
483        match self {
484            SqlCommand::Select(query) => QueryExpr::Table(query),
485            SqlCommand::Join(query) => QueryExpr::Join(query),
486            SqlCommand::Insert(query) => QueryExpr::Insert(query),
487            SqlCommand::Update(query) => QueryExpr::Update(query),
488            SqlCommand::Delete(query) => QueryExpr::Delete(query),
489            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
490            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
491            SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
492            SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
493            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
494            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
495            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
496            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
497            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
498            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
499            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
500            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
501            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
502            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
503            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
504            SqlCommand::CreateMetric(query) => QueryExpr::CreateMetric(query),
505            SqlCommand::AlterMetric(query) => QueryExpr::AlterMetric(query),
506            SqlCommand::CreateSlo(query) => QueryExpr::CreateSlo(query),
507            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
508            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
509            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
510            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
511            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
512            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
513            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
514            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
515            SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
516            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
517            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
518            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
519            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
520            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
521            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
522            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
523            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
524            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
525            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
526            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
527            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
528            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
529            SqlCommand::DropView(q) => QueryExpr::DropView(q),
530            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
531            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
532            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
533            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
534            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
535            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
536            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
537            SqlCommand::Grant(s) => QueryExpr::Grant(s),
538            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
539            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
540            SqlCommand::IamPolicy(e) => e,
541            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
542            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
543            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
544            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
545        }
546    }
547
548    pub fn into_statement(self) -> SqlStatement {
549        match self {
550            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
551            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
552            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
553            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
554            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
555            SqlCommand::ExplainAlter(query) => {
556                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
557            }
558            SqlCommand::CreateTable(query) => {
559                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
560            }
561            SqlCommand::CreateCollection(query) => {
562                SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
563            }
564            SqlCommand::CreateVector(query) => {
565                SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
566            }
567            SqlCommand::DropTable(query) => {
568                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
569            }
570            SqlCommand::DropGraph(query) => {
571                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
572            }
573            SqlCommand::DropVector(query) => {
574                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
575            }
576            SqlCommand::DropDocument(query) => {
577                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
578            }
579            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
580            SqlCommand::DropCollection(query) => {
581                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
582            }
583            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
584            SqlCommand::AlterTable(query) => {
585                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
586            }
587            SqlCommand::CreateIndex(query) => {
588                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
589            }
590            SqlCommand::DropIndex(query) => {
591                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
592            }
593            SqlCommand::CreateTimeSeries(query) => {
594                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
595            }
596            SqlCommand::CreateMetric(query) => {
597                SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query))
598            }
599            SqlCommand::AlterMetric(query) => {
600                SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query))
601            }
602            SqlCommand::CreateSlo(query) => {
603                SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query))
604            }
605            SqlCommand::DropTimeSeries(query) => {
606                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
607            }
608            SqlCommand::CreateQueue(query) => {
609                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
610            }
611            SqlCommand::AlterQueue(query) => {
612                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
613            }
614            SqlCommand::DropQueue(query) => {
615                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
616            }
617            SqlCommand::CreateTree(query) => {
618                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
619            }
620            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
621            SqlCommand::Probabilistic(command) => {
622                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
623            }
624            SqlCommand::SetConfig { key, value } => {
625                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
626            }
627            SqlCommand::ShowConfig { prefix } => {
628                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
629            }
630            SqlCommand::SetSecret { key, value } => {
631                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
632            }
633            SqlCommand::DeleteSecret { key } => {
634                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
635            }
636            SqlCommand::ShowSecrets { prefix } => {
637                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
638            }
639            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
640            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
641            SqlCommand::TransactionControl(ctl) => {
642                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
643            }
644            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
645            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
646            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
647            SqlCommand::CreateSequence(q) => {
648                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
649            }
650            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
651            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
652            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
653            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
654            SqlCommand::RefreshMaterializedView(q) => {
655                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
656            }
657            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
658            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
659            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
660            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
661            SqlCommand::CreateForeignTable(q) => {
662                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
663            }
664            SqlCommand::DropForeignTable(q) => {
665                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
666            }
667            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
668            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
669            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
670            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
671            SqlCommand::CreateMigration(q) => {
672                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
673            }
674            SqlCommand::ApplyMigration(q) => {
675                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
676            }
677            SqlCommand::RollbackMigration(q) => {
678                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
679            }
680            SqlCommand::ExplainMigration(q) => {
681                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
682            }
683        }
684    }
685}
686
687impl<'a> Parser<'a> {
688    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
689        self.expect_ident()?; // EVENTS
690        if self.consume_ident_ci("STATUS")? {
691            let mut query = TableQuery::new("red.subscriptions");
692            let collection = match self.peek().clone() {
693                Token::Ident(name) => {
694                    self.advance()?;
695                    Some(name)
696                }
697                Token::String(name) => {
698                    self.advance()?;
699                    Some(name)
700                }
701                _ => None,
702            };
703            self.parse_table_clauses(&mut query)?;
704            if let Some(collection) = collection {
705                let filter = Filter::compare(
706                    FieldRef::column("red.subscriptions", "collection"),
707                    CompareOp::Eq,
708                    Value::text(collection),
709                );
710                let expr = filter_to_expr(&filter);
711                query.where_expr = Some(match query.where_expr.take() {
712                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
713                    None => expr,
714                });
715                query.filter = Some(match query.filter.take() {
716                    Some(existing) => existing.and(filter),
717                    None => filter,
718                });
719            }
720            return Ok(QueryExpr::Table(query));
721        }
722
723        if !self.consume_ident_ci("BACKFILL")? {
724            return Err(ParseError::expected(
725                vec!["BACKFILL", "STATUS"],
726                self.peek(),
727                self.position(),
728            ));
729        }
730
731        if self.consume_ident_ci("STATUS")? {
732            let collection = self.expect_ident()?;
733            return Ok(QueryExpr::EventsBackfillStatus { collection });
734        }
735
736        let collection = self.expect_ident()?;
737        let where_filter = if self.consume(&Token::Where)? {
738            let mut parts = Vec::new();
739            while !self.check(&Token::Eof) && !self.check(&Token::To) {
740                parts.push(self.peek().to_string());
741                self.advance()?;
742            }
743            if parts.is_empty() {
744                return Err(ParseError::expected(
745                    vec!["predicate"],
746                    self.peek(),
747                    self.position(),
748                ));
749            }
750            Some(parts.join(" "))
751        } else {
752            None
753        };
754
755        self.expect(Token::To)?;
756        let target_queue = self.expect_ident()?;
757        let limit = if self.consume(&Token::Limit)? {
758            Some(self.parse_positive_integer("LIMIT")? as u64)
759        } else {
760            None
761        };
762
763        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
764            collection,
765            where_filter,
766            target_queue,
767            limit,
768        }))
769    }
770
771    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
772    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
773    /// clause is absent. Values are always single-quoted string literals —
774    /// consistent with PG's generic-options model.
775    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
776        if !self.consume(&Token::Options)? {
777            return Ok(Vec::new());
778        }
779        self.expect(Token::LParen)?;
780        let mut out: Vec<(String, String)> = Vec::new();
781        loop {
782            // Option keys frequently collide with reserved words
783            // (`path`, `format`, `delimiter`, `header`, …) — accept
784            // the keyword form and lowercase it so downstream
785            // option-name matching stays case-insensitive.
786            let was_ident = matches!(self.peek(), Token::Ident(_));
787            let raw = self.expect_ident_or_keyword()?;
788            let key = if was_ident {
789                raw
790            } else {
791                raw.to_ascii_lowercase()
792            };
793            // Value is a single-quoted string literal.
794            let value = self.parse_string()?;
795            out.push((key, value));
796            if !self.consume(&Token::Comma)? {
797                break;
798            }
799        }
800        self.expect(Token::RParen)?;
801        Ok(out)
802    }
803
804    /// Parse any top-level frontend statement through a single shared surface.
805    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
806        match self.peek() {
807            Token::Select => match self.parse_select_query()? {
808                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
809                    SqlQuery::Select(query),
810                ))),
811                QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
812                    SqlQuery::Join(query),
813                ))),
814                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
815                other => Err(ParseError::new(
816                    format!("internal: SELECT produced unexpected query kind {other:?}"),
817                    self.position(),
818                )),
819            },
820            Token::From
821            | Token::Insert
822            | Token::Update
823            | Token::Truncate
824            | Token::Create
825            | Token::Drop
826            | Token::Alter
827            | Token::Set
828            | Token::Begin
829            | Token::Commit
830            | Token::Rollback
831            | Token::Savepoint
832            | Token::Release
833            | Token::Start
834            | Token::Vacuum
835            | Token::Analyze
836            | Token::Copy
837            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
838            Token::Explain => {
839                if matches!(
840                    self.peek_next()?,
841                    Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
842                ) {
843                    match self.parse_explain_ask_query()? {
844                        QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
845                        other => Err(ParseError::new(
846                            format!(
847                                "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
848                            ),
849                            self.position(),
850                        )),
851                    }
852                } else {
853                    self.parse_sql_statement().map(FrontendStatement::Sql)
854                }
855            }
856            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
857                self.parse_sql_statement().map(FrontendStatement::Sql)
858            }
859            Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
860            Token::Ident(name)
861                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
862            {
863                self.parse_sql_statement().map(FrontendStatement::Sql)
864            }
865            Token::Ident(name)
866                if name.eq_ignore_ascii_case("GRANT")
867                    || name.eq_ignore_ascii_case("REVOKE")
868                    || name.eq_ignore_ascii_case("SIMULATE")
869                    || name.eq_ignore_ascii_case("LINT")
870                    || name.eq_ignore_ascii_case("MIGRATE")
871                    || name.eq_ignore_ascii_case("APPLY") =>
872            {
873                self.parse_sql_statement().map(FrontendStatement::Sql)
874            }
875            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
876                self.advance()?;
877                if matches!(
878                    self.peek(),
879                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
880                ) {
881                    match self.parse_config_watch_after_watch()? {
882                        QueryExpr::ConfigCommand(command) => {
883                            Ok(FrontendStatement::ConfigCommand(command))
884                        }
885                        other => Err(ParseError::new(
886                            format!(
887                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
888                            ),
889                            self.position(),
890                        )),
891                    }
892                } else if matches!(
893                    self.peek(),
894                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
895                ) {
896                    match self.parse_vault_watch_after_watch()? {
897                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
898                        other => Err(ParseError::new(
899                            format!(
900                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
901                            ),
902                            self.position(),
903                        )),
904                    }
905                } else {
906                    match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
907                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
908                        other => Err(ParseError::new(
909                            format!("internal: WATCH produced unexpected query kind {other:?}"),
910                            self.position(),
911                        )),
912                    }
913                }
914            }
915            Token::List => {
916                self.advance()?;
917                if matches!(
918                    self.peek(),
919                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
920                ) {
921                    match self.parse_config_list_after_list()? {
922                        QueryExpr::ConfigCommand(command) => {
923                            Ok(FrontendStatement::ConfigCommand(command))
924                        }
925                        other => Err(ParseError::new(
926                            format!(
927                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
928                            ),
929                            self.position(),
930                        )),
931                    }
932                } else if matches!(
933                    self.peek(),
934                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
935                ) {
936                    match self.parse_vault_list_after_list()? {
937                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
938                        other => Err(ParseError::new(
939                            format!(
940                                "internal: LIST VAULT produced unexpected query kind {other:?}"
941                            ),
942                            self.position(),
943                        )),
944                    }
945                } else {
946                    Err(ParseError::expected(
947                        vec!["CONFIG", "VAULT"],
948                        self.peek(),
949                        self.position(),
950                    ))
951                }
952            }
953            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
954                self.advance()?;
955                if matches!(
956                    self.peek(),
957                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
958                ) {
959                    match self.parse_config_list_after_list()? {
960                        QueryExpr::ConfigCommand(command) => {
961                            Ok(FrontendStatement::ConfigCommand(command))
962                        }
963                        other => Err(ParseError::new(
964                            format!(
965                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
966                            ),
967                            self.position(),
968                        )),
969                    }
970                } else if matches!(
971                    self.peek(),
972                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
973                ) {
974                    match self.parse_vault_list_after_list()? {
975                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
976                        other => Err(ParseError::new(
977                            format!(
978                                "internal: LIST VAULT produced unexpected query kind {other:?}"
979                            ),
980                            self.position(),
981                        )),
982                    }
983                } else {
984                    Err(ParseError::expected(
985                        vec!["CONFIG", "VAULT"],
986                        self.peek(),
987                        self.position(),
988                    ))
989                }
990            }
991            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
992                if matches!(
993                    self.peek_next()?,
994                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
995                ) {
996                    match self.parse_config_command()? {
997                        QueryExpr::ConfigCommand(command) => {
998                            Ok(FrontendStatement::ConfigCommand(command))
999                        }
1000                        other => Err(ParseError::new(
1001                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1002                            self.position(),
1003                        )),
1004                    }
1005                } else {
1006                    self.advance()?;
1007                    match self.parse_kv_invalidate_tags_after_invalidate()? {
1008                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1009                        other => Err(ParseError::new(
1010                            format!(
1011                                "internal: INVALIDATE produced unexpected query kind {other:?}"
1012                            ),
1013                            self.position(),
1014                        )),
1015                    }
1016                }
1017            }
1018            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
1019            Token::Match => match self.parse_match_query()? {
1020                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
1021                other => Err(ParseError::new(
1022                    format!("internal: MATCH produced unexpected query kind {other:?}"),
1023                    self.position(),
1024                )),
1025            },
1026            Token::Path => match self.parse_path_query()? {
1027                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
1028                other => Err(ParseError::new(
1029                    format!("internal: PATH produced unexpected query kind {other:?}"),
1030                    self.position(),
1031                )),
1032            },
1033            Token::Vector => match self.parse_vector_query()? {
1034                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
1035                other => Err(ParseError::new(
1036                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
1037                    self.position(),
1038                )),
1039            },
1040            Token::Hybrid => match self.parse_hybrid_query()? {
1041                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
1042                other => Err(ParseError::new(
1043                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
1044                    self.position(),
1045                )),
1046            },
1047            Token::Graph => match self.parse_graph_command()? {
1048                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
1049                other => Err(ParseError::new(
1050                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
1051                    self.position(),
1052                )),
1053            },
1054            Token::Search => match self.parse_search_command()? {
1055                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
1056                other => Err(ParseError::new(
1057                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
1058                    self.position(),
1059                )),
1060            },
1061            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
1062                match self.parse_ask_query()? {
1063                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
1064                    other => Err(ParseError::new(
1065                        format!("internal: ASK produced unexpected query kind {other:?}"),
1066                        self.position(),
1067                    )),
1068                }
1069            }
1070            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
1071                match self.parse_unseal_vault_command()? {
1072                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1073                    other => Err(ParseError::new(
1074                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
1075                        self.position(),
1076                    )),
1077                }
1078            }
1079            Token::Queue => match self.parse_queue_command()? {
1080                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
1081                other => Err(ParseError::new(
1082                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
1083                    self.position(),
1084                )),
1085            },
1086            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1087                match self.parse_events_command()? {
1088                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1089                        SqlQuery::Select(query),
1090                    ))),
1091                    QueryExpr::EventsBackfill(query) => {
1092                        Ok(FrontendStatement::EventsBackfill(query))
1093                    }
1094                    QueryExpr::EventsBackfillStatus { collection } => {
1095                        Ok(FrontendStatement::EventsBackfillStatus { collection })
1096                    }
1097                    other => Err(ParseError::new(
1098                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
1099                        self.position(),
1100                    )),
1101                }
1102            }
1103            Token::Kv => match self.parse_kv_command()? {
1104                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1105                other => Err(ParseError::new(
1106                    format!("internal: KV produced unexpected query kind {other:?}"),
1107                    self.position(),
1108                )),
1109            },
1110            Token::Delete => {
1111                if matches!(
1112                    self.peek_next()?,
1113                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1114                ) {
1115                    match self.parse_config_command()? {
1116                        QueryExpr::ConfigCommand(command) => {
1117                            Ok(FrontendStatement::ConfigCommand(command))
1118                        }
1119                        other => Err(ParseError::new(
1120                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1121                            self.position(),
1122                        )),
1123                    }
1124                } else if matches!(
1125                    self.peek_next()?,
1126                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1127                ) {
1128                    match self.parse_vault_lifecycle_command()? {
1129                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1130                        other => Err(ParseError::new(
1131                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1132                            self.position(),
1133                        )),
1134                    }
1135                } else {
1136                    self.parse_sql_statement().map(FrontendStatement::Sql)
1137                }
1138            }
1139            Token::Add => match self.parse_config_command()? {
1140                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1141                other => Err(ParseError::new(
1142                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
1143                    self.position(),
1144                )),
1145            },
1146            Token::Purge => match self.parse_vault_lifecycle_command()? {
1147                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1148                other => Err(ParseError::new(
1149                    format!("internal: VAULT produced unexpected query kind {other:?}"),
1150                    self.position(),
1151                )),
1152            },
1153            Token::Ident(name)
1154                if name.eq_ignore_ascii_case("PUT")
1155                    || name.eq_ignore_ascii_case("GET")
1156                    || name.eq_ignore_ascii_case("RESOLVE")
1157                    || name.eq_ignore_ascii_case("ROTATE")
1158                    || name.eq_ignore_ascii_case("HISTORY")
1159                    || name.eq_ignore_ascii_case("PURGE")
1160                    || name.eq_ignore_ascii_case("INCR")
1161                    || name.eq_ignore_ascii_case("DECR")
1162                    || name.eq_ignore_ascii_case("INVALIDATE") =>
1163            {
1164                if matches!(
1165                    self.peek_next()?,
1166                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1167                ) {
1168                    match self.parse_vault_lifecycle_command()? {
1169                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1170                        other => Err(ParseError::new(
1171                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1172                            self.position(),
1173                        )),
1174                    }
1175                } else {
1176                    match self.parse_config_command()? {
1177                        QueryExpr::ConfigCommand(command) => {
1178                            Ok(FrontendStatement::ConfigCommand(command))
1179                        }
1180                        other => Err(ParseError::new(
1181                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1182                            self.position(),
1183                        )),
1184                    }
1185                }
1186            }
1187            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1188                match self.parse_vault_command()? {
1189                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1190                    other => Err(ParseError::new(
1191                        format!("internal: VAULT produced unexpected query kind {other:?}"),
1192                        self.position(),
1193                    )),
1194                }
1195            }
1196            Token::Tree => match self.parse_tree_command()? {
1197                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1198                other => Err(ParseError::new(
1199                    format!("internal: TREE produced unexpected query kind {other:?}"),
1200                    self.position(),
1201                )),
1202            },
1203            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1204                match self.parse_hll_command()? {
1205                    QueryExpr::ProbabilisticCommand(command) => {
1206                        Ok(FrontendStatement::ProbabilisticCommand(command))
1207                    }
1208                    other => Err(ParseError::new(
1209                        format!("internal: HLL produced unexpected query kind {other:?}"),
1210                        self.position(),
1211                    )),
1212                }
1213            }
1214            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1215                match self.parse_sketch_command()? {
1216                    QueryExpr::ProbabilisticCommand(command) => {
1217                        Ok(FrontendStatement::ProbabilisticCommand(command))
1218                    }
1219                    other => Err(ParseError::new(
1220                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
1221                        self.position(),
1222                    )),
1223                }
1224            }
1225            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1226                match self.parse_filter_command()? {
1227                    QueryExpr::ProbabilisticCommand(command) => {
1228                        Ok(FrontendStatement::ProbabilisticCommand(command))
1229                    }
1230                    other => Err(ParseError::new(
1231                        format!("internal: FILTER produced unexpected query kind {other:?}"),
1232                        self.position(),
1233                    )),
1234                }
1235            }
1236            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1237                .parse_sql_command()
1238                .map(SqlCommand::into_statement)
1239                .map(FrontendStatement::Sql),
1240            other => Err(ParseError::expected(
1241                vec![
1242                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1243                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1244                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1245                    "DESCRIBE", "DESC",
1246                ],
1247                other,
1248                self.position(),
1249            )),
1250        }
1251    }
1252
1253    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
1254    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1255        self.parse_sql_command().map(SqlCommand::into_statement)
1256    }
1257
1258    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1259        let mut path = self.expect_ident()?;
1260        while self.consume(&Token::Dot)? {
1261            let next = self.expect_ident_or_keyword()?;
1262            path = format!("{path}.{next}");
1263        }
1264        Ok(if lowercase {
1265            path.to_ascii_lowercase()
1266        } else {
1267            path
1268        })
1269    }
1270
1271    /// Parse any SQL/RQL-style command through a single frontend module.
1272    /// Parse a `CREATE ...` statement. Split out of
1273    /// [`parse_sql_command`] so its very large per-arm locals
1274    /// (every CREATE variant's query struct) live in their own
1275    /// stack frame instead of inflating the dispatcher's frame.
1276    /// `parse_sql_command` recurses (CREATE VIEW ... AS <stmt>,
1277    /// nested subqueries), so a fat dispatcher frame stacked on
1278    /// itself overflowed small (2 MiB) worker-thread stacks (#635).
1279    #[inline(never)]
1280    fn parse_create_command(&mut self) -> Result<SqlCommand, ParseError> {
1281        let pos = self.position();
1282        self.advance()?;
1283
1284        // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
1285        // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
1286        // don't collide with other CREATE variants (TABLE, INDEX, etc.).
1287        let mut or_replace = false;
1288        if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1289            let _ = self.consume_ident_ci("REPLACE")?;
1290            or_replace = true;
1291        }
1292        let materialized = self.consume(&Token::Materialized)?;
1293        if self.check(&Token::View) {
1294            self.advance()?;
1295            let if_not_exists = self.match_if_not_exists()?;
1296            let name = self.expect_ident()?;
1297            // Issue #584 slice 12 — `WITH RETENTION <duration>`
1298            // on CREATE MATERIALIZED VIEW. Parsed before `AS`
1299            // so the SELECT body parser cannot consume the
1300            // trailing `WITH` for its own (TTL / METADATA /
1301            // …) clauses. Persisted on the view definition;
1302            // the physical sweep against view-backing rows
1303            // activates with the slice-9 row-storage follow-up.
1304            let mut retention_duration_ms: Option<u64> = None;
1305            if self.check(&Token::With) {
1306                self.advance()?;
1307                if !self.consume(&Token::Retention)? && !self.consume_ident_ci("RETENTION")? {
1308                    return Err(ParseError::expected(
1309                        vec!["RETENTION"],
1310                        self.peek(),
1311                        self.position(),
1312                    ));
1313                }
1314                if !materialized {
1315                    return Err(ParseError::new(
1316                        "WITH RETENTION is only valid on \
1317                                 CREATE MATERIALIZED VIEW"
1318                            .to_string(),
1319                        self.position(),
1320                    ));
1321                }
1322                let value = self.parse_float()?;
1323                let unit_mult = self.parse_duration_unit()?;
1324                retention_duration_ms = Some((value * unit_mult).round() as u64);
1325            }
1326            // Accept `AS` — the lexer promotes it to `Token::As`
1327            // (keyword) but some paths still see it as an ident.
1328            if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1329                return Err(ParseError::expected(
1330                    vec!["AS"],
1331                    self.peek(),
1332                    self.position(),
1333                ));
1334            }
1335            // Recursive parse of the body. Any QueryExpr that the
1336            // rest of the grammar accepts is valid (Select, Join, etc.).
1337            let body = self.parse_sql_command()?.into_query_expr();
1338            // Optional `REFRESH EVERY <duration>` clause on
1339            // materialized views (issue #583 slice 10). The
1340            // background scheduler reads this off the view
1341            // descriptor and ticks the view on its cadence.
1342            let mut refresh_every_ms: Option<u64> = None;
1343            if self.check(&Token::Refresh) {
1344                if !materialized {
1345                    return Err(ParseError::new(
1346                        "REFRESH EVERY is only valid on \
1347                                 CREATE MATERIALIZED VIEW"
1348                            .to_string(),
1349                        self.position(),
1350                    ));
1351                }
1352                self.advance()?;
1353                if !self.consume_ident_ci("EVERY")? {
1354                    return Err(ParseError::expected(
1355                        vec!["EVERY"],
1356                        self.peek(),
1357                        self.position(),
1358                    ));
1359                }
1360                let value = self.parse_float()?;
1361                let unit_mult = self.parse_duration_unit()?;
1362                refresh_every_ms = Some((value * unit_mult).round() as u64);
1363            }
1364            return Ok(SqlCommand::CreateView(CreateViewQuery {
1365                name,
1366                query: Box::new(body),
1367                materialized,
1368                if_not_exists,
1369                or_replace,
1370                refresh_every_ms,
1371                retention_duration_ms,
1372            }));
1373        }
1374        // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
1375        // bail out — no other CREATE form accepts those modifiers.
1376        if or_replace || materialized {
1377            return Err(ParseError::expected(
1378                vec!["VIEW"],
1379                self.peek(),
1380                self.position(),
1381            ));
1382        }
1383
1384        if self.check(&Token::Index) || self.check(&Token::Unique) {
1385            match self.parse_create_index_query()? {
1386                QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1387                other => Err(ParseError::new(
1388                    format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1389                    self.position(),
1390                )),
1391            }
1392        } else if self.check(&Token::Table) {
1393            self.expect(Token::Table)?;
1394            match self.parse_create_table_body()? {
1395                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1396                other => Err(ParseError::new(
1397                    format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1398                    self.position(),
1399                )),
1400            }
1401        } else if self.check(&Token::Graph) {
1402            self.advance()?;
1403            match self.parse_create_collection_model_body(CollectionModel::Graph)? {
1404                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1405                other => Err(ParseError::new(
1406                    format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
1407                    self.position(),
1408                )),
1409            }
1410        } else if self.check(&Token::Document) {
1411            self.advance()?;
1412            match self.parse_create_collection_model_body(CollectionModel::Document)? {
1413                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1414                other => Err(ParseError::new(
1415                    format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
1416                    self.position(),
1417                )),
1418            }
1419        } else if self.check(&Token::Vector) {
1420            self.advance()?;
1421            match self.parse_create_vector_body()? {
1422                QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
1423                other => Err(ParseError::new(
1424                    format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
1425                    self.position(),
1426                )),
1427            }
1428        } else if self.check(&Token::Collection) {
1429            self.advance()?;
1430            match self.parse_create_collection_body()? {
1431                QueryExpr::CreateCollection(query) => Ok(SqlCommand::CreateCollection(query)),
1432                other => Err(ParseError::new(
1433                    format!("internal: CREATE COLLECTION produced unexpected kind {other:?}"),
1434                    self.position(),
1435                )),
1436            }
1437        } else if self.check(&Token::Kv) {
1438            self.advance()?;
1439            match self.parse_create_keyed_body(CollectionModel::Kv)? {
1440                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1441                other => Err(ParseError::new(
1442                    format!("internal: CREATE KV produced unexpected kind {other:?}"),
1443                    self.position(),
1444                )),
1445            }
1446        } else if self.consume_ident_ci("CONFIG")? {
1447            match self.parse_create_keyed_body(CollectionModel::Config)? {
1448                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1449                other => Err(ParseError::new(
1450                    format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1451                    self.position(),
1452                )),
1453            }
1454        } else if self.consume_ident_ci("VAULT")? {
1455            match self.parse_create_keyed_body(CollectionModel::Vault)? {
1456                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1457                other => Err(ParseError::new(
1458                    format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1459                    self.position(),
1460                )),
1461            }
1462        } else if self.check(&Token::Timeseries) {
1463            self.advance()?;
1464            match self.parse_create_timeseries_body()? {
1465                QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
1466                other => Err(ParseError::new(
1467                    format!("internal: CREATE TIMESERIES produced unexpected kind {other:?}"),
1468                    self.position(),
1469                )),
1470            }
1471        } else if self.check(&Token::Metric) {
1472            self.advance()?;
1473            match self.parse_create_metric_body()? {
1474                QueryExpr::CreateMetric(query) => Ok(SqlCommand::CreateMetric(query)),
1475                other => Err(ParseError::new(
1476                    format!("internal: CREATE METRIC produced unexpected kind {other:?}"),
1477                    self.position(),
1478                )),
1479            }
1480        } else if self.consume_ident_ci("METRICS")? {
1481            match self.parse_create_metrics_body()? {
1482                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1483                other => Err(ParseError::new(
1484                    format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
1485                    self.position(),
1486                )),
1487            }
1488        } else if self.consume_ident_ci("SLO")? {
1489            match self.parse_create_slo_body()? {
1490                QueryExpr::CreateSlo(query) => Ok(SqlCommand::CreateSlo(query)),
1491                other => Err(ParseError::new(
1492                    format!("internal: CREATE SLO produced unexpected kind {other:?}"),
1493                    self.position(),
1494                )),
1495            }
1496        } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE")) {
1497            self.advance()?;
1498            match self.parse_create_hypertable_body()? {
1499                QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
1500                other => Err(ParseError::new(
1501                    format!("internal: CREATE HYPERTABLE produced unexpected kind {other:?}"),
1502                    self.position(),
1503                )),
1504            }
1505        } else if self.check(&Token::Queue) {
1506            self.advance()?;
1507            match self.parse_create_queue_body()? {
1508                QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1509                other => Err(ParseError::new(
1510                    format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1511                    self.position(),
1512                )),
1513            }
1514        } else if self.check(&Token::Tree) {
1515            self.advance()?;
1516            match self.parse_create_tree_body()? {
1517                QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1518                other => Err(ParseError::new(
1519                    format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1520                    self.position(),
1521                )),
1522            }
1523        } else if matches!(self.peek(), Token::Ident(n) if
1524                    n.eq_ignore_ascii_case("HLL") ||
1525                    n.eq_ignore_ascii_case("SKETCH") ||
1526                    n.eq_ignore_ascii_case("FILTER"))
1527        {
1528            match self.parse_create_probabilistic()? {
1529                QueryExpr::ProbabilisticCommand(command) => Ok(SqlCommand::Probabilistic(command)),
1530                other => Err(ParseError::new(
1531                    format!("internal: CREATE probabilistic produced unexpected kind {other:?}"),
1532                    self.position(),
1533                )),
1534            }
1535        } else if self.check(&Token::Schema) {
1536            // CREATE SCHEMA [IF NOT EXISTS] name
1537            self.advance()?;
1538            let if_not_exists = self.match_if_not_exists()?;
1539            let name = self.expect_ident()?;
1540            Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1541                name,
1542                if_not_exists,
1543            }))
1544        } else if self.check(&Token::Policy) {
1545            // Two forms share the leading `CREATE POLICY` tokens:
1546            //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
1547            //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
1548            // Disambiguate by peeking the token after POLICY.
1549            self.advance()?;
1550            if matches!(self.peek(), Token::String(_)) {
1551                // IAM form — short-circuit out of the SQL command stack.
1552                let expr = self.parse_create_iam_policy_after_keywords()?;
1553                // Inline command-wrapping: produce a synthetic SqlCommand by
1554                // routing through a generic IAM admin holder. We don't
1555                // have a dedicated SqlCommand variant for IAM yet, so we
1556                // bounce through the existing Grant-shaped Admin slot
1557                // which expects no further tokens.
1558                return Ok(SqlCommand::IamPolicy(expr));
1559            }
1560            let name = self.expect_ident()?;
1561            self.expect(Token::On)?;
1562
1563            let (target_kind, table) = {
1564                use crate::storage::query::ast::PolicyTargetKind;
1565                let kw = match self.peek() {
1566                    Token::Ident(s) => Some(s.to_ascii_uppercase()),
1567                    _ => None,
1568                };
1569                let kind = kw.as_deref().and_then(|k| match k {
1570                    "NODES" => Some(PolicyTargetKind::Nodes),
1571                    "EDGES" => Some(PolicyTargetKind::Edges),
1572                    "VECTORS" => Some(PolicyTargetKind::Vectors),
1573                    "MESSAGES" => Some(PolicyTargetKind::Messages),
1574                    "POINTS" => Some(PolicyTargetKind::Points),
1575                    "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1576                    _ => None,
1577                });
1578                if let Some(k) = kind {
1579                    self.advance()?;
1580                    self.expect(Token::Of)?;
1581                    let coll = self.expect_ident()?;
1582                    (k, coll)
1583                } else {
1584                    let coll = self.expect_ident()?;
1585                    (PolicyTargetKind::Table, coll)
1586                }
1587            };
1588
1589            let action = if self.consume(&Token::For)? {
1590                let a = match self.peek() {
1591                    Token::Select => {
1592                        self.advance()?;
1593                        Some(PolicyAction::Select)
1594                    }
1595                    Token::Insert => {
1596                        self.advance()?;
1597                        Some(PolicyAction::Insert)
1598                    }
1599                    Token::Update => {
1600                        self.advance()?;
1601                        Some(PolicyAction::Update)
1602                    }
1603                    Token::Delete => {
1604                        self.advance()?;
1605                        Some(PolicyAction::Delete)
1606                    }
1607                    Token::All => {
1608                        self.advance()?;
1609                        None
1610                    }
1611                    _ => None,
1612                };
1613                a
1614            } else {
1615                None
1616            };
1617
1618            let role = if self.consume(&Token::To)? {
1619                Some(self.expect_ident()?)
1620            } else {
1621                None
1622            };
1623
1624            self.expect(Token::Using)?;
1625            self.expect(Token::LParen)?;
1626            let filter = self.parse_filter()?;
1627            self.expect(Token::RParen)?;
1628
1629            Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1630                name,
1631                table,
1632                action,
1633                role,
1634                using: Box::new(filter),
1635                target_kind,
1636            }))
1637        } else if self.check(&Token::Server) {
1638            // CREATE SERVER [IF NOT EXISTS] name
1639            //   FOREIGN DATA WRAPPER kind
1640            //   [OPTIONS (key 'value', ...)]
1641            self.advance()?;
1642            let if_not_exists = self.match_if_not_exists()?;
1643            let name = self.expect_ident()?;
1644            self.expect(Token::Foreign)?;
1645            self.expect(Token::Data)?;
1646            self.expect(Token::Wrapper)?;
1647            let wrapper = self.expect_ident()?;
1648            let options = self.parse_fdw_options_clause()?;
1649            Ok(SqlCommand::CreateServer(CreateServerQuery {
1650                name,
1651                wrapper,
1652                options,
1653                if_not_exists,
1654            }))
1655        } else if self.check(&Token::Foreign) {
1656            // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
1657            //   SERVER server_name
1658            //   [OPTIONS (key 'value', ...)]
1659            self.advance()?;
1660            self.expect(Token::Table)?;
1661            let if_not_exists = self.match_if_not_exists()?;
1662            let name = self.expect_ident()?;
1663            self.expect(Token::LParen)?;
1664            let mut columns = Vec::new();
1665            loop {
1666                let col_name = self.expect_ident()?;
1667                let data_type = self.expect_ident_or_keyword()?;
1668                // Inline NOT NULL check — the CREATE TABLE path's helper is
1669                // private and coupling to it just for FDW columns isn't worth it.
1670                let mut not_null = false;
1671                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1672                    self.advance()?;
1673                    if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL")) {
1674                        self.advance()?;
1675                        not_null = true;
1676                    }
1677                }
1678                columns.push(ForeignColumnDef {
1679                    name: col_name,
1680                    data_type,
1681                    not_null,
1682                });
1683                if !self.consume(&Token::Comma)? {
1684                    break;
1685                }
1686            }
1687            self.expect(Token::RParen)?;
1688            self.expect(Token::Server)?;
1689            let server = self.expect_ident()?;
1690            let options = self.parse_fdw_options_clause()?;
1691            Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1692                name,
1693                server,
1694                columns,
1695                options,
1696                if_not_exists,
1697            }))
1698        } else if self.check(&Token::Sequence) {
1699            // CREATE SEQUENCE [IF NOT EXISTS] name
1700            //   [START [WITH] n] [INCREMENT [BY] n]
1701            self.advance()?;
1702            let if_not_exists = self.match_if_not_exists()?;
1703            let name = self.expect_ident()?;
1704            let mut start: i64 = 1;
1705            let mut increment: i64 = 1;
1706            // Loop over optional clauses in any order.
1707            loop {
1708                if self.consume(&Token::Start)? {
1709                    // Accept `START 100` or `START WITH 100`.
1710                    let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1711                    start = self.parse_integer()?;
1712                } else if self.consume(&Token::Increment)? {
1713                    // Accept `INCREMENT 5` or `INCREMENT BY 5`.
1714                    let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1715                    increment = self.parse_integer()?;
1716                } else {
1717                    break;
1718                }
1719            }
1720            Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1721                name,
1722                if_not_exists,
1723                start,
1724                increment,
1725            }))
1726        } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
1727            self.advance()?; // consume MIGRATION
1728            match self.parse_create_migration_body()? {
1729                QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1730                other => Err(ParseError::new(
1731                    format!("internal: CREATE MIGRATION produced unexpected kind {other:?}"),
1732                    self.position(),
1733                )),
1734            }
1735        } else if let Some(reason) = analytics_v0_non_goal_create(self.peek()) {
1736            // Issue #789 — enforce Analytics v0 non-goals at the parser
1737            // surface. The parent PRD (#782) explicitly excludes generic
1738            // analytics objects, a new event storage model, cohorts,
1739            // funnels, SLA contracts, and adapters from v0. Reject these
1740            // CREATE forms here with a stable, non-goal-specific message
1741            // so accidental use surfaces an obvious "out of scope for v0"
1742            // error rather than the generic CREATE fallback.
1743            Err(ParseError::new(reason, self.position()))
1744        } else if let Some(err) =
1745            ParseError::unsupported_recognized_token(self.peek(), self.position())
1746        {
1747            Err(err)
1748        } else {
1749            Err(ParseError::expected(
1750                vec![
1751                    "TABLE",
1752                    "GRAPH",
1753                    "VECTOR",
1754                    "DOCUMENT",
1755                    "KV",
1756                    "COLLECTION",
1757                    "INDEX",
1758                    "UNIQUE",
1759                    "METRIC",
1760                    "TIMESERIES",
1761                    "QUEUE",
1762                    "TREE",
1763                    "HLL",
1764                    "SKETCH",
1765                    "FILTER",
1766                    "SCHEMA",
1767                    "SEQUENCE",
1768                    "MIGRATION",
1769                ],
1770                self.peek(),
1771                pos,
1772            ))
1773        }
1774    }
1775
1776    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1777        match self.peek() {
1778            Token::Select => match self.parse_select_query()? {
1779                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1780                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1781                other => Err(ParseError::new(
1782                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1783                    self.position(),
1784                )),
1785            },
1786            Token::From => match self.parse_from_query()? {
1787                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1788                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1789                other => Err(ParseError::new(
1790                    format!("internal: FROM produced unexpected query kind {other:?}"),
1791                    self.position(),
1792                )),
1793            },
1794            Token::Insert => match self.parse_insert_query()? {
1795                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1796                other => Err(ParseError::new(
1797                    format!("internal: INSERT produced unexpected query kind {other:?}"),
1798                    self.position(),
1799                )),
1800            },
1801            Token::Update => match self.parse_update_query()? {
1802                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1803                other => Err(ParseError::new(
1804                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
1805                    self.position(),
1806                )),
1807            },
1808            Token::Delete => {
1809                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1810                {
1811                    self.advance()?; // DELETE
1812                    self.advance()?; // SECRET
1813                    let key = self.parse_dotted_admin_path(true)?;
1814                    Ok(SqlCommand::DeleteSecret { key })
1815                } else {
1816                    match self.parse_delete_query()? {
1817                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1818                        other => Err(ParseError::new(
1819                            format!("internal: DELETE produced unexpected query kind {other:?}"),
1820                            self.position(),
1821                        )),
1822                    }
1823                }
1824            }
1825            Token::Truncate => {
1826                self.advance()?;
1827                let model = if self.consume(&Token::Table)? {
1828                    Some(CollectionModel::Table)
1829                } else if self.consume(&Token::Graph)? {
1830                    Some(CollectionModel::Graph)
1831                } else if self.consume(&Token::Vector)? {
1832                    Some(CollectionModel::Vector)
1833                } else if self.consume(&Token::Document)? {
1834                    Some(CollectionModel::Document)
1835                } else if self.consume(&Token::Timeseries)? {
1836                    Some(CollectionModel::TimeSeries)
1837                } else if self.consume_ident_ci("METRICS")? {
1838                    Some(CollectionModel::Metrics)
1839                } else if self.consume(&Token::Kv)? {
1840                    Some(CollectionModel::Kv)
1841                } else if self.consume(&Token::Queue)? {
1842                    Some(CollectionModel::Queue)
1843                } else if self.consume(&Token::Collection)? {
1844                    None
1845                } else {
1846                    return Err(ParseError::expected(
1847                        vec![
1848                            "TABLE",
1849                            "GRAPH",
1850                            "VECTOR",
1851                            "DOCUMENT",
1852                            "TIMESERIES",
1853                            "METRICS",
1854                            "KV",
1855                            "QUEUE",
1856                            "COLLECTION",
1857                        ],
1858                        self.peek(),
1859                        self.position(),
1860                    ));
1861                };
1862                match self.parse_truncate_body(model)? {
1863                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1864                    other => Err(ParseError::new(
1865                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1866                        self.position(),
1867                    )),
1868                }
1869            }
1870            Token::Explain => {
1871                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
1872                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
1873                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1874                {
1875                    self.advance()?; // consume EXPLAIN
1876                    match self.parse_explain_migration_after_keyword()? {
1877                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
1878                        other => Err(ParseError::new(
1879                            format!(
1880                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
1881                            ),
1882                            self.position(),
1883                        )),
1884                    }
1885                } else {
1886                    match self.parse_explain_alter_query()? {
1887                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
1888                        other => Err(ParseError::new(
1889                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
1890                            self.position(),
1891                        )),
1892                    }
1893                }
1894            }
1895            Token::Create => self.parse_create_command(),
1896            Token::Drop => {
1897                let pos = self.position();
1898                self.advance()?;
1899
1900                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
1901                let materialized = self.consume(&Token::Materialized)?;
1902                if self.check(&Token::View) {
1903                    self.advance()?;
1904                    let if_exists = self.match_if_exists()?;
1905                    let name = self.expect_ident()?;
1906                    return Ok(SqlCommand::DropView(DropViewQuery {
1907                        name,
1908                        materialized,
1909                        if_exists,
1910                    }));
1911                }
1912                if materialized {
1913                    return Err(ParseError::expected(
1914                        vec!["VIEW"],
1915                        self.peek(),
1916                        self.position(),
1917                    ));
1918                }
1919
1920                if self.check(&Token::Index) {
1921                    match self.parse_drop_index_query()? {
1922                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
1923                        other => Err(ParseError::new(
1924                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
1925                            self.position(),
1926                        )),
1927                    }
1928                } else if self.check(&Token::Table) {
1929                    self.expect(Token::Table)?;
1930                    match self.parse_drop_table_body()? {
1931                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
1932                        other => Err(ParseError::new(
1933                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
1934                            self.position(),
1935                        )),
1936                    }
1937                } else if self.check(&Token::Graph) {
1938                    self.advance()?;
1939                    match self.parse_drop_graph_body()? {
1940                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
1941                        other => Err(ParseError::new(
1942                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
1943                            self.position(),
1944                        )),
1945                    }
1946                } else if self.check(&Token::Vector) {
1947                    self.advance()?;
1948                    match self.parse_drop_vector_body()? {
1949                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
1950                        other => Err(ParseError::new(
1951                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
1952                            self.position(),
1953                        )),
1954                    }
1955                } else if self.check(&Token::Document) {
1956                    self.advance()?;
1957                    match self.parse_drop_document_body()? {
1958                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
1959                        other => Err(ParseError::new(
1960                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
1961                            self.position(),
1962                        )),
1963                    }
1964                } else if self.check(&Token::Kv) {
1965                    self.advance()?;
1966                    match self.parse_drop_kv_body()? {
1967                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1968                        other => Err(ParseError::new(
1969                            format!("internal: DROP KV produced unexpected kind {other:?}"),
1970                            self.position(),
1971                        )),
1972                    }
1973                } else if self.consume_ident_ci("CONFIG")? {
1974                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
1975                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1976                        other => Err(ParseError::new(
1977                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
1978                            self.position(),
1979                        )),
1980                    }
1981                } else if self.consume_ident_ci("VAULT")? {
1982                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
1983                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1984                        other => Err(ParseError::new(
1985                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
1986                            self.position(),
1987                        )),
1988                    }
1989                } else if self.check(&Token::Collection) {
1990                    self.advance()?;
1991                    match self.parse_drop_collection_body()? {
1992                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1993                        other => Err(ParseError::new(
1994                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
1995                            self.position(),
1996                        )),
1997                    }
1998                } else if self.check(&Token::Timeseries) {
1999                    self.advance()?;
2000                    match self.parse_drop_timeseries_body()? {
2001                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
2002                        other => Err(ParseError::new(
2003                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
2004                            self.position(),
2005                        )),
2006                    }
2007                } else if self.consume_ident_ci("METRICS")? {
2008                    match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
2009                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
2010                        other => Err(ParseError::new(
2011                            format!("internal: DROP METRICS produced unexpected kind {other:?}"),
2012                            self.position(),
2013                        )),
2014                    }
2015                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
2016                {
2017                    // DROP HYPERTABLE name reuses the same AST as
2018                    // DROP TIMESERIES — runtime clears the registry
2019                    // entry *and* drops the backing collection.
2020                    self.advance()?;
2021                    match self.parse_drop_timeseries_body()? {
2022                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
2023                        other => Err(ParseError::new(
2024                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
2025                            self.position(),
2026                        )),
2027                    }
2028                } else if self.check(&Token::Queue) {
2029                    self.advance()?;
2030                    match self.parse_drop_queue_body()? {
2031                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
2032                        other => Err(ParseError::new(
2033                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
2034                            self.position(),
2035                        )),
2036                    }
2037                } else if self.check(&Token::Tree) {
2038                    self.advance()?;
2039                    match self.parse_drop_tree_body()? {
2040                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
2041                        other => Err(ParseError::new(
2042                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
2043                            self.position(),
2044                        )),
2045                    }
2046                } else if matches!(self.peek(), Token::Ident(n) if
2047                    n.eq_ignore_ascii_case("HLL") ||
2048                    n.eq_ignore_ascii_case("SKETCH") ||
2049                    n.eq_ignore_ascii_case("FILTER"))
2050                {
2051                    match self.parse_drop_probabilistic()? {
2052                        QueryExpr::ProbabilisticCommand(command) => {
2053                            Ok(SqlCommand::Probabilistic(command))
2054                        }
2055                        other => Err(ParseError::new(
2056                            format!(
2057                                "internal: DROP probabilistic produced unexpected kind {other:?}"
2058                            ),
2059                            self.position(),
2060                        )),
2061                    }
2062                } else if self.check(&Token::Schema) {
2063                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
2064                    self.advance()?;
2065                    let if_exists = self.match_if_exists()?;
2066                    let name = self.expect_ident()?;
2067                    let cascade = self.consume(&Token::Cascade)?;
2068                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
2069                        name,
2070                        if_exists,
2071                        cascade,
2072                    }))
2073                } else if self.check(&Token::Policy) {
2074                    // Two forms:
2075                    //   * IAM:   DROP POLICY '<id>'
2076                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
2077                    self.advance()?;
2078                    if matches!(self.peek(), Token::String(_)) {
2079                        let expr = self.parse_drop_iam_policy_after_keywords()?;
2080                        return Ok(SqlCommand::IamPolicy(expr));
2081                    }
2082                    let if_exists = self.match_if_exists()?;
2083                    let name = self.expect_ident()?;
2084                    self.expect(Token::On)?;
2085                    let table = self.expect_ident()?;
2086                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
2087                        name,
2088                        table,
2089                        if_exists,
2090                    }))
2091                } else if self.check(&Token::Server) {
2092                    // DROP SERVER [IF EXISTS] name [CASCADE]
2093                    self.advance()?;
2094                    let if_exists = self.match_if_exists()?;
2095                    let name = self.expect_ident()?;
2096                    let cascade = self.consume(&Token::Cascade)?;
2097                    Ok(SqlCommand::DropServer(DropServerQuery {
2098                        name,
2099                        if_exists,
2100                        cascade,
2101                    }))
2102                } else if self.check(&Token::Foreign) {
2103                    // DROP FOREIGN TABLE [IF EXISTS] name
2104                    self.advance()?;
2105                    self.expect(Token::Table)?;
2106                    let if_exists = self.match_if_exists()?;
2107                    let name = self.expect_ident()?;
2108                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
2109                        name,
2110                        if_exists,
2111                    }))
2112                } else if self.check(&Token::Sequence) {
2113                    // DROP SEQUENCE [IF EXISTS] name
2114                    self.advance()?;
2115                    let if_exists = self.match_if_exists()?;
2116                    let name = self.expect_ident()?;
2117                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
2118                        name,
2119                        if_exists,
2120                    }))
2121                } else if let Some(err) =
2122                    ParseError::unsupported_recognized_token(self.peek(), self.position())
2123                {
2124                    Err(err)
2125                } else {
2126                    Err(ParseError::expected(
2127                        vec![
2128                            "TABLE",
2129                            "INDEX",
2130                            "TIMESERIES",
2131                            "QUEUE",
2132                            "TREE",
2133                            "HLL",
2134                            "SKETCH",
2135                            "FILTER",
2136                            "SCHEMA",
2137                            "SEQUENCE",
2138                        ],
2139                        self.peek(),
2140                        pos,
2141                    ))
2142                }
2143            }
2144            Token::Alter => {
2145                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
2146                // committing to a path until we've seen the target.
2147                // We peek the *next* token (without consuming) and
2148                // dispatch accordingly.
2149                let next = self.peek_next()?.clone();
2150                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
2151                    self.advance()?; // consume ALTER
2152                    let stmt = self.parse_alter_user_statement()?;
2153                    Ok(SqlCommand::AlterUser(stmt))
2154                } else if matches!(next, Token::Queue) {
2155                    self.advance()?; // consume ALTER
2156                    self.advance()?; // consume QUEUE
2157                    match self.parse_alter_queue_body()? {
2158                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
2159                        other => Err(ParseError::new(
2160                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
2161                            self.position(),
2162                        )),
2163                    }
2164                } else if matches!(next, Token::Metric) {
2165                    self.advance()?; // consume ALTER
2166                    self.advance()?; // consume METRIC
2167                    match self.parse_alter_metric_body()? {
2168                        QueryExpr::AlterMetric(query) => Ok(SqlCommand::AlterMetric(query)),
2169                        other => Err(ParseError::new(
2170                            format!("internal: ALTER METRIC produced unexpected kind {other:?}"),
2171                            self.position(),
2172                        )),
2173                    }
2174                } else if matches!(next, Token::Graph) {
2175                    // Issue #801 — `ALTER GRAPH name ADD|DROP ANALYTICS ...`
2176                    // shares the AlterTable AST so analytics-config lifecycle
2177                    // mutations dispatch through the existing executor path.
2178                    match self.parse_alter_graph_query()? {
2179                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2180                        other => Err(ParseError::new(
2181                            format!(
2182                                "internal: ALTER GRAPH produced unexpected query kind {other:?}"
2183                            ),
2184                            self.position(),
2185                        )),
2186                    }
2187                } else if matches!(next, Token::Table)
2188                    || matches!(next, Token::Collection)
2189                    || matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("COLLECTION"))
2190                {
2191                    // Issue #522 — `ALTER COLLECTION` shares the AlterTable
2192                    // AST so signer-registry mutations dispatch through the
2193                    // existing executor. The DDL parser body accepts either
2194                    // keyword interchangeably for the open-vocabulary alters
2195                    // we own (currently `ADD|REVOKE SIGNER`).
2196                    match self.parse_alter_table_query()? {
2197                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2198                        other => Err(ParseError::new(
2199                            format!(
2200                                "internal: ALTER TABLE produced unexpected query kind {other:?}"
2201                            ),
2202                            self.position(),
2203                        )),
2204                    }
2205                } else if let Some(err) =
2206                    ParseError::unsupported_recognized_token(&next, self.position())
2207                {
2208                    Err(err)
2209                } else {
2210                    match self.parse_alter_table_query()? {
2211                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2212                        other => Err(ParseError::new(
2213                            format!("internal: ALTER produced unexpected query kind {other:?}"),
2214                            self.position(),
2215                        )),
2216                    }
2217                }
2218            }
2219            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
2220                let stmt = self.parse_grant_statement()?;
2221                Ok(SqlCommand::Grant(stmt))
2222            }
2223            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
2224                let stmt = self.parse_revoke_statement()?;
2225                Ok(SqlCommand::Revoke(stmt))
2226            }
2227            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2228                self.advance()?;
2229                if self.consume_ident_ci("BACKFILL")? {
2230                    return Err(ParseError::new(
2231                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
2232                            .to_string(),
2233                        self.position(),
2234                    ));
2235                }
2236                if !self.consume_ident_ci("STATUS")? {
2237                    return Err(ParseError::expected(
2238                        vec!["STATUS"],
2239                        self.peek(),
2240                        self.position(),
2241                    ));
2242                }
2243
2244                let mut query = TableQuery::new("red.subscriptions");
2245                let collection = match self.peek().clone() {
2246                    Token::Ident(name) => {
2247                        self.advance()?;
2248                        Some(name)
2249                    }
2250                    Token::String(name) => {
2251                        self.advance()?;
2252                        Some(name)
2253                    }
2254                    _ => None,
2255                };
2256                self.parse_table_clauses(&mut query)?;
2257                if let Some(collection) = collection {
2258                    let filter = Filter::compare(
2259                        FieldRef::column("red.subscriptions", "collection"),
2260                        CompareOp::Eq,
2261                        Value::text(collection),
2262                    );
2263                    let expr = filter_to_expr(&filter);
2264                    query.where_expr = Some(match query.where_expr.take() {
2265                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
2266                        None => expr,
2267                    });
2268                    query.filter = Some(match query.filter.take() {
2269                        Some(existing) => existing.and(filter),
2270                        None => filter,
2271                    });
2272                }
2273                Ok(SqlCommand::Select(query))
2274            }
2275            Token::Attach => {
2276                let expr = self.parse_attach_policy()?;
2277                Ok(SqlCommand::IamPolicy(expr))
2278            }
2279            Token::Detach => {
2280                let expr = self.parse_detach_policy()?;
2281                Ok(SqlCommand::IamPolicy(expr))
2282            }
2283            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
2284                let expr = self.parse_simulate_policy()?;
2285                Ok(SqlCommand::IamPolicy(expr))
2286            }
2287            Token::Ident(name) if name.eq_ignore_ascii_case("LINT") => {
2288                let expr = self.parse_lint_policy()?;
2289                Ok(SqlCommand::IamPolicy(expr))
2290            }
2291            Token::Ident(name) if name.eq_ignore_ascii_case("MIGRATE") => {
2292                // `MIGRATE POLICY MODE TO ...` is the S5B (#714) path.
2293                // Lookahead one token because `MIGRATE` is otherwise
2294                // unused at this layer.
2295                let next = self.peek_next()?.clone();
2296                let is_policy_mode = matches!(&next, Token::Policy)
2297                    || matches!(&next, Token::Ident(name)
2298                        if name.eq_ignore_ascii_case("POLICY"));
2299                if is_policy_mode {
2300                    let expr = self.parse_migrate_policy_mode()?;
2301                    return Ok(SqlCommand::IamPolicy(expr));
2302                }
2303                Err(ParseError::expected(
2304                    vec!["POLICY"],
2305                    self.peek(),
2306                    self.position(),
2307                ))
2308            }
2309            Token::Set => {
2310                self.advance()?;
2311                if self.consume_ident_ci("CONFIG")? {
2312                    let full_key = self.parse_dotted_admin_path(true)?;
2313                    self.expect(Token::Eq)?;
2314                    let value = self.parse_literal_value()?;
2315                    Ok(SqlCommand::SetConfig {
2316                        key: full_key,
2317                        value,
2318                    })
2319                } else if self.consume_ident_ci("SECRET")? {
2320                    let key = self.parse_dotted_admin_path(true)?;
2321                    self.expect(Token::Eq)?;
2322                    let value = self.parse_literal_value()?;
2323                    Ok(SqlCommand::SetSecret { key, value })
2324                } else if self.consume_ident_ci("TENANT")? {
2325                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
2326                    // SET TENANT NULL  |  SET TENANT = NULL
2327                    let _ = self.consume(&Token::Eq)?;
2328                    if self.consume_ident_ci("NULL")? {
2329                        Ok(SqlCommand::SetTenant(None))
2330                    } else {
2331                        let value = self.parse_literal_value()?;
2332                        match value {
2333                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2334                            Value::Null => Ok(SqlCommand::SetTenant(None)),
2335                            other => Err(ParseError::new(
2336                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2337                                self.position(),
2338                            )),
2339                        }
2340                    }
2341                } else {
2342                    Err(ParseError::expected(
2343                        vec!["CONFIG", "SECRET", "TENANT"],
2344                        self.peek(),
2345                        self.position(),
2346                    ))
2347                }
2348            }
2349            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2350                self.advance()?;
2351                match self.parse_apply_migration()? {
2352                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2353                    other => Err(ParseError::new(
2354                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2355                        self.position(),
2356                    )),
2357                }
2358            }
2359            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2360                // RESET TENANT — session-local clear
2361                self.advance()?;
2362                if self.consume_ident_ci("TENANT")? {
2363                    Ok(SqlCommand::SetTenant(None))
2364                } else {
2365                    Err(ParseError::expected(
2366                        vec!["TENANT"],
2367                        self.peek(),
2368                        self.position(),
2369                    ))
2370                }
2371            }
2372            Token::Ident(name)
2373                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
2374            {
2375                self.advance()?;
2376                let collection = self.parse_dotted_admin_path(false)?;
2377                let mut query = TableQuery::new("red.describe");
2378                query.filter = Some(Filter::compare(
2379                    FieldRef::column("", "collection"),
2380                    CompareOp::Eq,
2381                    Value::text(collection),
2382                ));
2383                Ok(SqlCommand::Select(query))
2384            }
2385            Token::Desc => {
2386                self.advance()?;
2387                let collection = self.parse_dotted_admin_path(false)?;
2388                let mut query = TableQuery::new("red.describe");
2389                query.filter = Some(Filter::compare(
2390                    FieldRef::column("", "collection"),
2391                    CompareOp::Eq,
2392                    Value::text(collection),
2393                ));
2394                Ok(SqlCommand::Select(query))
2395            }
2396            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2397                self.advance()?;
2398                if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
2399                    if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
2400                        return Err(ParseError::expected(
2401                            vec!["TABLE"],
2402                            self.peek(),
2403                            self.position(),
2404                        ));
2405                    }
2406                    let collection = self.parse_dotted_admin_path(false)?;
2407                    let mut query = TableQuery::new("red.show_create");
2408                    query.filter = Some(Filter::compare(
2409                        FieldRef::column("", "collection"),
2410                        CompareOp::Eq,
2411                        Value::text(collection),
2412                    ));
2413                    Ok(SqlCommand::Select(query))
2414                } else if self.consume_ident_ci("CONFIG")? {
2415                    // Accept dotted prefixes the same way SET CONFIG does
2416                    // (`SHOW CONFIG durability.mode`), and empty prefix
2417                    // (`SHOW CONFIG`) for a catalog-wide listing.
2418                    let prefix = if !self.check(&Token::Eof) {
2419                        let first = self.expect_ident()?;
2420                        let mut full = first;
2421                        while self.consume(&Token::Dot)? {
2422                            let next = self.expect_ident_or_keyword()?;
2423                            full = format!("{full}.{next}");
2424                        }
2425                        // Match SET CONFIG: lowercase so keyword segments
2426                        // come out consistent with the stored keys.
2427                        Some(full.to_ascii_lowercase())
2428                    } else {
2429                        None
2430                    };
2431                    Ok(SqlCommand::ShowConfig { prefix })
2432                } else if self.consume_ident_ci("COLLECTIONS")? {
2433                    let mut query = TableQuery::new("red.collections");
2434                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2435                        if !self.consume_ident_ci("INTERNAL")? {
2436                            return Err(ParseError::expected(
2437                                vec!["INTERNAL"],
2438                                self.peek(),
2439                                self.position(),
2440                            ));
2441                        }
2442                        true
2443                    } else {
2444                        false
2445                    };
2446                    self.parse_table_clauses(&mut query)?;
2447                    if !include_internal {
2448                        let user_filter = query.filter.take();
2449                        let hide_internal = crate::storage::query::ast::Filter::Compare {
2450                            field: FieldRef::column("", "internal"),
2451                            op: CompareOp::Eq,
2452                            value: Value::Boolean(false),
2453                        };
2454                        query.filter = Some(match user_filter {
2455                            Some(filter) => filter.and(hide_internal),
2456                            None => hide_internal,
2457                        });
2458                    }
2459                    Ok(SqlCommand::Select(query))
2460                } else if self.consume_ident_ci("TABLES")? {
2461                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2462                        self, "table",
2463                    )?))
2464                } else if self.consume_ident_ci("QUEUES")? {
2465                    // Issue #535 — `SHOW QUEUES` desugars to the
2466                    // `red.queues` virtual table (queue-shaped
2467                    // columns), not the filtered `red.collections`
2468                    // view. `INCLUDING INTERNAL` mirrors the
2469                    // `SHOW COLLECTIONS` opt-in: without it, DLQ
2470                    // targets and other auto-created queues are
2471                    // hidden via the `internal = false` filter.
2472                    let mut query = TableQuery::new("red.queues");
2473                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2474                        if !self.consume_ident_ci("INTERNAL")? {
2475                            return Err(ParseError::expected(
2476                                vec!["INTERNAL"],
2477                                self.peek(),
2478                                self.position(),
2479                            ));
2480                        }
2481                        true
2482                    } else {
2483                        false
2484                    };
2485                    self.parse_table_clauses(&mut query)?;
2486                    if !include_internal {
2487                        let hide_internal = Filter::Compare {
2488                            field: FieldRef::column("", "internal"),
2489                            op: CompareOp::Eq,
2490                            value: Value::Boolean(false),
2491                        };
2492                        add_table_filter(&mut query, hide_internal);
2493                    }
2494                    Ok(SqlCommand::Select(query))
2495                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2496                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2497                        self, "vector",
2498                    )?))
2499                } else if self.consume_ident_ci("DOCUMENTS")? {
2500                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2501                        self, "document",
2502                    )?))
2503                } else if self.consume(&Token::Timeseries)?
2504                    || self.consume_ident_ci("TIMESERIES")?
2505                {
2506                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2507                        self,
2508                        "timeseries",
2509                    )?))
2510                } else if self.consume_ident_ci("GRAPHS")? {
2511                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2512                        self, "graph",
2513                    )?))
2514                } else if self.consume_ident_ci("CONFIGS")? {
2515                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2516                        self, "config",
2517                    )?))
2518                } else if self.consume_ident_ci("VAULTS")? {
2519                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2520                        self, "vault",
2521                    )?))
2522                } else if self.consume(&Token::Kv)?
2523                    || self.consume_ident_ci("KV")?
2524                    || self.consume_ident_ci("KVS")?
2525                {
2526                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2527                        self, "kv",
2528                    )?))
2529                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2530                    let collection = self.parse_dotted_admin_path(false)?;
2531                    let mut query = TableQuery::new("red.columns");
2532                    query.filter = Some(Filter::compare(
2533                        FieldRef::column("", "collection"),
2534                        CompareOp::Eq,
2535                        Value::text(collection),
2536                    ));
2537                    Ok(SqlCommand::Select(query))
2538                } else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
2539                    let mut query = TableQuery::new("red.show_indexes");
2540                    if self.consume(&Token::On)? {
2541                        let collection = self.expect_ident_or_keyword()?;
2542                        let filter = Filter::Compare {
2543                            field: FieldRef::column("", "table"),
2544                            op: CompareOp::Eq,
2545                            value: Value::text(collection),
2546                        };
2547                        query.where_expr = Some(filter_to_expr(&filter));
2548                        query.filter = Some(filter);
2549                    }
2550                    self.parse_table_clauses(&mut query)?;
2551                    Ok(SqlCommand::Select(query))
2552                } else if self.consume_ident_ci("POLICIES")? {
2553                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2554                        let principal = self.parse_iam_principal_kind()?;
2555                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2556                            filter: Some(principal),
2557                        }));
2558                    }
2559                    let mut query = TableQuery::new("red.policies");
2560                    let collection_filter =
2561                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2562                            let collection = self.parse_dotted_admin_path(false)?;
2563                            Some(Filter::Compare {
2564                                field: FieldRef::TableColumn {
2565                                    table: String::new(),
2566                                    column: "collection".to_string(),
2567                                },
2568                                op: CompareOp::Eq,
2569                                value: Value::text(collection),
2570                            })
2571                        } else {
2572                            None
2573                        };
2574                    self.parse_table_clauses(&mut query)?;
2575                    if let Some(collection_filter) = collection_filter {
2576                        let combined = match query.filter.take() {
2577                            Some(existing) => {
2578                                Filter::And(Box::new(collection_filter), Box::new(existing))
2579                            }
2580                            None => collection_filter,
2581                        };
2582                        query.where_expr = Some(filter_to_expr(&combined));
2583                        query.filter = Some(combined);
2584                    }
2585                    Ok(SqlCommand::Select(query))
2586                } else if self.consume_ident_ci("STATS")? {
2587                    let mut query = TableQuery::new("red.stats");
2588                    let collection = match self.peek().clone() {
2589                        Token::Ident(name) => {
2590                            self.advance()?;
2591                            Some(name)
2592                        }
2593                        Token::String(name) => {
2594                            self.advance()?;
2595                            Some(name)
2596                        }
2597                        _ => None,
2598                    };
2599                    self.parse_table_clauses(&mut query)?;
2600                    if let Some(collection) = collection {
2601                        let filter = Filter::compare(
2602                            FieldRef::column("red.stats", "collection"),
2603                            CompareOp::Eq,
2604                            Value::text(collection),
2605                        );
2606                        let expr = filter_to_expr(&filter);
2607                        query.where_expr = Some(match query.where_expr.take() {
2608                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
2609                            None => expr,
2610                        });
2611                        query.filter = Some(match query.filter.take() {
2612                            Some(existing) => existing.and(filter),
2613                            None => filter,
2614                        });
2615                    }
2616                    Ok(SqlCommand::Select(query))
2617                } else if self.consume_ident_ci("SAMPLE")? {
2618                    let mut query = TableQuery::new(&self.expect_ident()?);
2619                    query.limit = if self.consume(&Token::Limit)? {
2620                        Some(self.parse_integer()? as u64)
2621                    } else {
2622                        Some(10)
2623                    };
2624                    Ok(SqlCommand::Select(query))
2625                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2626                    let prefix = if !self.check(&Token::Eof) {
2627                        Some(self.parse_dotted_admin_path(true)?)
2628                    } else {
2629                        None
2630                    };
2631                    Ok(SqlCommand::ShowSecrets { prefix })
2632                } else if self.consume_ident_ci("TENANT")? {
2633                    Ok(SqlCommand::ShowTenant)
2634                } else if let Some(expr) = self.parse_show_iam_after_show()? {
2635                    Ok(SqlCommand::IamPolicy(expr))
2636                } else {
2637                    Err(ParseError::expected(
2638                        vec![
2639                            "CONFIG",
2640                            "SECRET",
2641                            "SECRETS",
2642                            "COLLECTIONS",
2643                            "TABLES",
2644                            "QUEUES",
2645                            "VECTORS",
2646                            "DOCUMENTS",
2647                            "TIMESERIES",
2648                            "GRAPHS",
2649                            "KV",
2650                            "SCHEMA",
2651                            "INDICES",
2652                            "INDEXES",
2653                            "SAMPLE",
2654                            "POLICIES",
2655                            "STATS",
2656                            "TENANT",
2657                            "EFFECTIVE",
2658                        ],
2659                        self.peek(),
2660                        self.position(),
2661                    ))
2662                }
2663            }
2664            // Transaction control statements (Phase 1.1 PG parity).
2665            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
2666            // START TRANSACTION [ISOLATION LEVEL <mode>]
2667            //
2668            // We only implement SNAPSHOT ISOLATION (our default). We
2669            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2670            // READ / SNAPSHOT as PG-compatible no-ops, but reject
2671            // SERIALIZABLE outright — the previous behaviour of
2672            // silently degrading to snapshot made the parser
2673            // dishonest. Real SSI (Serializable Snapshot Isolation)
2674            // is tracked as a future milestone.
2675            Token::Begin | Token::Start => {
2676                self.advance()?;
2677                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2678                // Optional ISOLATION LEVEL clause.
2679                if self.consume_ident_ci("ISOLATION")? {
2680                    self.expect(Token::Level)?;
2681                    // The level identifier can span multiple words
2682                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2683                    // READ). Collect them case-insensitively.
2684                    let mut parts: Vec<String> = Vec::new();
2685                    if self.consume_ident_ci("READ")? {
2686                        parts.push("READ".to_string());
2687                        if self.consume_ident_ci("UNCOMMITTED")? {
2688                            parts.push("UNCOMMITTED".to_string());
2689                        } else if self.consume_ident_ci("COMMITTED")? {
2690                            parts.push("COMMITTED".to_string());
2691                        } else {
2692                            return Err(ParseError::expected(
2693                                vec!["UNCOMMITTED", "COMMITTED"],
2694                                self.peek(),
2695                                self.position(),
2696                            ));
2697                        }
2698                    } else if self.consume_ident_ci("REPEATABLE")? {
2699                        parts.push("REPEATABLE".to_string());
2700                        if !self.consume_ident_ci("READ")? {
2701                            return Err(ParseError::expected(
2702                                vec!["READ"],
2703                                self.peek(),
2704                                self.position(),
2705                            ));
2706                        }
2707                        parts.push("READ".to_string());
2708                    } else if self.consume_ident_ci("SNAPSHOT")? {
2709                        parts.push("SNAPSHOT".to_string());
2710                    } else if self.consume_ident_ci("SERIALIZABLE")? {
2711                        return Err(ParseError::new(
2712                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2713                             currently provides SNAPSHOT ISOLATION (which PG calls \
2714                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2715                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
2716                                .to_string(),
2717                            self.position(),
2718                        ));
2719                    } else {
2720                        return Err(ParseError::expected(
2721                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2722                            self.peek(),
2723                            self.position(),
2724                        ));
2725                    }
2726                    // All accepted modes map to our snapshot engine today.
2727                    let _ = parts;
2728                }
2729                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2730            }
2731            // COMMIT [WORK | TRANSACTION]
2732            Token::Commit => {
2733                self.advance()?;
2734                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2735                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2736            }
2737            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
2738            // ROLLBACK MIGRATION name
2739            Token::Rollback => {
2740                self.advance()?;
2741                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2742                    match self.parse_rollback_migration_after_keyword()? {
2743                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2744                        other => Err(ParseError::new(
2745                            format!(
2746                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2747                            ),
2748                            self.position(),
2749                        )),
2750                    }
2751                } else {
2752                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2753                    if self.consume(&Token::To)? {
2754                        let _ = self.consume(&Token::Savepoint)?;
2755                        let name = self.expect_ident()?;
2756                        Ok(SqlCommand::TransactionControl(
2757                            TxnControl::RollbackToSavepoint(name),
2758                        ))
2759                    } else {
2760                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2761                    }
2762                }
2763            }
2764            // SAVEPOINT name
2765            Token::Savepoint => {
2766                self.advance()?;
2767                let name = self.expect_ident()?;
2768                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2769            }
2770            // RELEASE [SAVEPOINT] name
2771            Token::Release => {
2772                self.advance()?;
2773                let _ = self.consume(&Token::Savepoint)?;
2774                let name = self.expect_ident()?;
2775                Ok(SqlCommand::TransactionControl(
2776                    TxnControl::ReleaseSavepoint(name),
2777                ))
2778            }
2779            // VACUUM [FULL] [table]
2780            Token::Vacuum => {
2781                self.advance()?;
2782                let full = self.consume(&Token::Full)?;
2783                let target = if self.check(&Token::Eof) {
2784                    None
2785                } else {
2786                    Some(self.expect_ident()?)
2787                };
2788                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2789                    target,
2790                    full,
2791                }))
2792            }
2793            // REFRESH MATERIALIZED VIEW name
2794            Token::Refresh => {
2795                self.advance()?;
2796                self.expect(Token::Materialized)?;
2797                self.expect(Token::View)?;
2798                let name = self.expect_ident()?;
2799                Ok(SqlCommand::RefreshMaterializedView(
2800                    RefreshMaterializedViewQuery { name },
2801                ))
2802            }
2803            // ANALYZE [table]
2804            Token::Analyze => {
2805                self.advance()?;
2806                let target = if self.check(&Token::Eof) {
2807                    None
2808                } else {
2809                    Some(self.expect_ident()?)
2810                };
2811                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2812                    target,
2813                }))
2814            }
2815            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
2816            //
2817            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
2818            // short-form `DELIMITER ',' HEADER`. The only supported format
2819            // today is CSV.
2820            Token::Copy => {
2821                self.advance()?;
2822                let table = self.expect_ident()?;
2823                self.expect(Token::From)?;
2824                let path = self.parse_string()?;
2825
2826                let mut delimiter: Option<char> = None;
2827                let mut has_header = false;
2828                let format = CopyFormat::Csv;
2829
2830                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
2831                // `WITH` is a reserved keyword token — accept both the keyword
2832                // form and the ident form that non-CTE callers sometimes emit.
2833                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2834                    self.expect(Token::LParen)?;
2835                    loop {
2836                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2837                            let _ = self.consume(&Token::Eq)?;
2838                            // Only CSV for now — accept the ident and move on.
2839                            let _ = self.expect_ident()?;
2840                        } else if self.consume(&Token::Header)? {
2841                            let _ = self.consume(&Token::Eq)?;
2842                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
2843                            // or an ident spelling of true/false.
2844                            has_header = match self.peek().clone() {
2845                                Token::True => {
2846                                    self.advance()?;
2847                                    true
2848                                }
2849                                Token::False => {
2850                                    self.advance()?;
2851                                    false
2852                                }
2853                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2854                                    self.advance()?;
2855                                    true
2856                                }
2857                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2858                                    self.advance()?;
2859                                    false
2860                                }
2861                                _ => true,
2862                            };
2863                        } else if self.consume(&Token::Delimiter)? {
2864                            let _ = self.consume(&Token::Eq)?;
2865                            let s = self.parse_string()?;
2866                            delimiter = s.chars().next();
2867                        } else {
2868                            break;
2869                        }
2870                        if !self.consume(&Token::Comma)? {
2871                            break;
2872                        }
2873                    }
2874                    self.expect(Token::RParen)?;
2875                }
2876
2877                // Short form clauses outside WITH (in either order).
2878                loop {
2879                    if self.consume(&Token::Delimiter)? {
2880                        let s = self.parse_string()?;
2881                        delimiter = s.chars().next();
2882                    } else if self.consume(&Token::Header)? {
2883                        has_header = true;
2884                    } else {
2885                        break;
2886                    }
2887                }
2888
2889                Ok(SqlCommand::CopyFrom(CopyFromQuery {
2890                    table,
2891                    path,
2892                    format,
2893                    delimiter,
2894                    has_header,
2895                }))
2896            }
2897            other => Err(ParseError::expected(
2898                vec![
2899                    "SELECT",
2900                    "FROM",
2901                    "INSERT",
2902                    "UPDATE",
2903                    "DELETE",
2904                    "EXPLAIN",
2905                    "CREATE",
2906                    "DROP",
2907                    "ALTER",
2908                    "SET",
2909                    "SHOW",
2910                    "BEGIN",
2911                    "COMMIT",
2912                    "ROLLBACK",
2913                    "SAVEPOINT",
2914                    "RELEASE",
2915                    "START",
2916                    "VACUUM",
2917                    "ANALYZE",
2918                    "COPY",
2919                    "REFRESH",
2920                    "DESCRIBE",
2921                    "DESC",
2922                ],
2923                other,
2924                self.position(),
2925            )),
2926        }
2927    }
2928}