Skip to main content

reddb_server/storage/query/
sql.rs

1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3    AlterMetricQuery, AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery,
4    AskQuery, BinOp, CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
5    CreateForeignTableQuery, CreateIndexQuery, CreateMetricQuery, CreateMigrationQuery,
6    CreatePolicyQuery, CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery,
7    CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery, CreateVectorQuery,
8    CreateViewQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropForeignTableQuery,
9    DropGraphQuery, DropIndexQuery, DropKvQuery, DropPolicyQuery, DropQueueQuery, DropSchemaQuery,
10    DropSequenceQuery, DropServerQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery,
11    DropVectorQuery, DropViewQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainMigrationQuery,
12    Expr, FieldRef, Filter, ForeignColumnDef, GrantStmt, GraphCommand, GraphQuery, HybridQuery,
13    InsertQuery, JoinQuery, KvCommand, MaintenanceCommand, PathQuery, PolicyAction,
14    ProbabilisticCommand, QueryExpr, QueueCommand, QueueSelectQuery, RankOfQuery, RankRangeQuery,
15    RefreshMaterializedViewQuery, RevokeStmt, RollbackMigrationQuery, SearchCommand, Span,
16    TableQuery, TreeCommand, TruncateQuery, TxnControl, UpdateQuery, VectorQuery,
17};
18use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
19use crate::storage::query::sql_lowering::filter_to_expr;
20use crate::storage::query::Token;
21use crate::storage::schema::Value;
22
23/// Canonical SQL frontend command surface.
24///
25/// This is the single entrypoint for SQL/RQL-style commands before they are
26/// lowered into the broader multi-backend `QueryExpr` space.
27#[derive(Debug, Clone)]
28pub enum SqlStatement {
29    Query(SqlQuery),
30    Mutation(SqlMutation),
31    Schema(SqlSchemaCommand),
32    Admin(SqlAdminCommand),
33}
34
35#[derive(Debug, Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum FrontendStatement {
38    Sql(SqlStatement),
39    Graph(GraphQuery),
40    GraphCommand(GraphCommand),
41    Path(PathQuery),
42    Vector(VectorQuery),
43    Hybrid(HybridQuery),
44    Search(SearchCommand),
45    Ask(AskQuery),
46    QueueSelect(QueueSelectQuery),
47    QueueCommand(QueueCommand),
48    EventsBackfill(EventsBackfillQuery),
49    EventsBackfillStatus { collection: String },
50    TreeCommand(TreeCommand),
51    ProbabilisticCommand(ProbabilisticCommand),
52    KvCommand(KvCommand),
53    ConfigCommand(ConfigCommand),
54    Ranking(QueryExpr),
55}
56
57#[derive(Debug, Clone)]
58pub enum SqlCommand {
59    Select(TableQuery),
60    Join(JoinQuery),
61    Insert(InsertQuery),
62    Update(UpdateQuery),
63    Delete(DeleteQuery),
64    ExplainAlter(ExplainAlterQuery),
65    CreateTable(CreateTableQuery),
66    CreateCollection(CreateCollectionQuery),
67    CreateVector(CreateVectorQuery),
68    DropTable(DropTableQuery),
69    DropGraph(DropGraphQuery),
70    DropVector(DropVectorQuery),
71    DropDocument(DropDocumentQuery),
72    DropKv(DropKvQuery),
73    DropCollection(DropCollectionQuery),
74    Truncate(TruncateQuery),
75    AlterTable(AlterTableQuery),
76    CreateIndex(CreateIndexQuery),
77    DropIndex(DropIndexQuery),
78    CreateTimeSeries(CreateTimeSeriesQuery),
79    CreateMetric(CreateMetricQuery),
80    AlterMetric(AlterMetricQuery),
81    CreateSlo(CreateSloQuery),
82    DropTimeSeries(DropTimeSeriesQuery),
83    CreateQueue(CreateQueueQuery),
84    AlterQueue(AlterQueueQuery),
85    DropQueue(DropQueueQuery),
86    CreateTree(CreateTreeQuery),
87    DropTree(DropTreeQuery),
88    Probabilistic(ProbabilisticCommand),
89    SetConfig {
90        key: String,
91        value: Value,
92    },
93    ShowConfig {
94        prefix: Option<String>,
95    },
96    SetSecret {
97        key: String,
98        value: Value,
99    },
100    DeleteSecret {
101        key: String,
102    },
103    ShowSecrets {
104        prefix: Option<String>,
105    },
106    SetTenant(Option<String>),
107    ShowTenant,
108    TransactionControl(TxnControl),
109    Maintenance(MaintenanceCommand),
110    CreateSchema(CreateSchemaQuery),
111    DropSchema(DropSchemaQuery),
112    CreateSequence(CreateSequenceQuery),
113    DropSequence(DropSequenceQuery),
114    CopyFrom(CopyFromQuery),
115    CreateView(CreateViewQuery),
116    DropView(DropViewQuery),
117    RefreshMaterializedView(RefreshMaterializedViewQuery),
118    CreatePolicy(CreatePolicyQuery),
119    DropPolicy(DropPolicyQuery),
120    CreateServer(CreateServerQuery),
121    DropServer(DropServerQuery),
122    CreateForeignTable(CreateForeignTableQuery),
123    DropForeignTable(DropForeignTableQuery),
124    /// `GRANT … ON … TO …`
125    Grant(GrantStmt),
126    /// `REVOKE … ON … FROM …`
127    Revoke(RevokeStmt),
128    /// `ALTER USER name <attrs>`
129    AlterUser(AlterUserStmt),
130    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
131    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
132    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
133    /// can route the multitude of shapes through a single arm.
134    IamPolicy(QueryExpr),
135    CreateMigration(CreateMigrationQuery),
136    ApplyMigration(ApplyMigrationQuery),
137    RollbackMigration(RollbackMigrationQuery),
138    ExplainMigration(ExplainMigrationQuery),
139}
140
141/// Issue #789 — Analytics v0 non-goal map for `CREATE …` forms.
142///
143/// PRD #782 ringfences Analytics v0 around a metric-centric catalog and
144/// explicitly excludes generic analytics objects, a separate event
145/// storage model, cohorts, funnels, SLA contracts, and adapter surfaces.
146/// When the parser sees one of these idents in the `CREATE` head, return
147/// a stable v0-scoped rejection message; otherwise return `None` and let
148/// the regular CREATE fallback handle the token.
149fn analytics_v0_non_goal_create(token: &Token) -> Option<String> {
150    let ident = match token {
151        Token::Ident(s) => s,
152        _ => return None,
153    };
154    let upper = ident.to_ascii_uppercase();
155    let message = match upper.as_str() {
156        "ANALYTICS" => {
157            "CREATE ANALYTICS is not supported in Analytics v0 — \
158             use CREATE METRIC <dotted.path> for the metric-centric \
159             catalog (PRD #782 non-goal)"
160        }
161        "EVENT" => {
162            "CREATE EVENT is not supported in Analytics v0 — \
163             event-shaped data lives in ordinary TABLE/DOCUMENT \
164             collections, not a new storage model (PRD #782 non-goal)"
165        }
166        "COHORT" => {
167            "CREATE COHORT is not supported in Analytics v0 — \
168             cohort surfaces are deferred (PRD #782 non-goal)"
169        }
170        "FUNNEL" => {
171            "CREATE FUNNEL is not supported in Analytics v0 — \
172             funnel surfaces are deferred (PRD #782 non-goal)"
173        }
174        "SLA" => {
175            "CREATE SLA is not supported in Analytics v0 — \
176             SLA/legal/commercial contract modeling is post-MVP \
177             (PRD #782 non-goal)"
178        }
179        "ADAPTER" => {
180            "CREATE ADAPTER is not supported in Analytics v0 — \
181             Prometheus/Grafana/Snowplow/Google Analytics adapters \
182             are deferred (PRD #782 non-goal)"
183        }
184        _ => return None,
185    };
186    Some(message.to_string())
187}
188
189fn collection_model_filter(model: &str) -> Filter {
190    Filter::Compare {
191        field: FieldRef::column("", "model"),
192        op: CompareOp::Eq,
193        value: Value::Text(model.to_string().into()),
194    }
195}
196
197fn add_table_filter(query: &mut TableQuery, filter: Filter) {
198    let combined = match query.filter.take() {
199        Some(existing) => existing.and(filter),
200        None => filter,
201    };
202    query.where_expr = Some(filter_to_expr(&combined));
203    query.filter = Some(combined);
204}
205
206fn parse_show_collections_by_model(
207    parser: &mut Parser<'_>,
208    model: &str,
209) -> Result<TableQuery, ParseError> {
210    let mut query = TableQuery::new("red.collections");
211    parser.parse_table_clauses(&mut query)?;
212    add_table_filter(&mut query, collection_model_filter(model));
213    Ok(query)
214}
215
216#[derive(Debug, Clone)]
217#[allow(clippy::large_enum_variant)]
218pub enum SqlQuery {
219    Select(TableQuery),
220    Join(JoinQuery),
221}
222
223#[derive(Debug, Clone)]
224pub enum SqlMutation {
225    Insert(InsertQuery),
226    Update(UpdateQuery),
227    Delete(DeleteQuery),
228}
229
230#[derive(Debug, Clone)]
231pub enum SqlSchemaCommand {
232    ExplainAlter(ExplainAlterQuery),
233    CreateTable(CreateTableQuery),
234    CreateCollection(CreateCollectionQuery),
235    CreateVector(CreateVectorQuery),
236    DropTable(DropTableQuery),
237    DropGraph(DropGraphQuery),
238    DropVector(DropVectorQuery),
239    DropDocument(DropDocumentQuery),
240    DropKv(DropKvQuery),
241    DropCollection(DropCollectionQuery),
242    Truncate(TruncateQuery),
243    AlterTable(AlterTableQuery),
244    CreateIndex(CreateIndexQuery),
245    DropIndex(DropIndexQuery),
246    CreateTimeSeries(CreateTimeSeriesQuery),
247    CreateMetric(CreateMetricQuery),
248    AlterMetric(AlterMetricQuery),
249    CreateSlo(CreateSloQuery),
250    DropTimeSeries(DropTimeSeriesQuery),
251    CreateQueue(CreateQueueQuery),
252    AlterQueue(AlterQueueQuery),
253    DropQueue(DropQueueQuery),
254    CreateTree(CreateTreeQuery),
255    DropTree(DropTreeQuery),
256    Probabilistic(ProbabilisticCommand),
257    CreateSchema(CreateSchemaQuery),
258    DropSchema(DropSchemaQuery),
259    CreateSequence(CreateSequenceQuery),
260    DropSequence(DropSequenceQuery),
261    CopyFrom(CopyFromQuery),
262    CreateView(CreateViewQuery),
263    DropView(DropViewQuery),
264    RefreshMaterializedView(RefreshMaterializedViewQuery),
265    CreatePolicy(CreatePolicyQuery),
266    DropPolicy(DropPolicyQuery),
267    CreateServer(CreateServerQuery),
268    DropServer(DropServerQuery),
269    CreateForeignTable(CreateForeignTableQuery),
270    DropForeignTable(DropForeignTableQuery),
271    CreateMigration(CreateMigrationQuery),
272    ApplyMigration(ApplyMigrationQuery),
273    RollbackMigration(RollbackMigrationQuery),
274    ExplainMigration(ExplainMigrationQuery),
275}
276
277#[derive(Debug, Clone)]
278#[allow(clippy::large_enum_variant)]
279pub enum SqlAdminCommand {
280    SetConfig { key: String, value: Value },
281    ShowConfig { prefix: Option<String> },
282    SetSecret { key: String, value: Value },
283    DeleteSecret { key: String },
284    ShowSecrets { prefix: Option<String> },
285    SetTenant(Option<String>),
286    ShowTenant,
287    TransactionControl(TxnControl),
288    Maintenance(MaintenanceCommand),
289    Grant(GrantStmt),
290    Revoke(RevokeStmt),
291    AlterUser(AlterUserStmt),
292    IamPolicy(QueryExpr),
293}
294
295impl SqlStatement {
296    pub fn into_command(self) -> SqlCommand {
297        match self {
298            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
299            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
300            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
301            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
302            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
303            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
304                SqlCommand::ExplainAlter(query)
305            }
306            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
307                SqlCommand::CreateTable(query)
308            }
309            SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
310                SqlCommand::CreateCollection(query)
311            }
312            SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
313                SqlCommand::CreateVector(query)
314            }
315            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
316                SqlCommand::DropTable(query)
317            }
318            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
319                SqlCommand::DropGraph(query)
320            }
321            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
322                SqlCommand::DropVector(query)
323            }
324            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
325                SqlCommand::DropDocument(query)
326            }
327            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
328            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
329                SqlCommand::DropCollection(query)
330            }
331            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
332            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
333                SqlCommand::AlterTable(query)
334            }
335            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
336                SqlCommand::CreateIndex(query)
337            }
338            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
339                SqlCommand::DropIndex(query)
340            }
341            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
342                SqlCommand::CreateTimeSeries(query)
343            }
344            SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query)) => {
345                SqlCommand::CreateMetric(query)
346            }
347            SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query)) => {
348                SqlCommand::AlterMetric(query)
349            }
350            SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query)) => {
351                SqlCommand::CreateSlo(query)
352            }
353            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
354                SqlCommand::DropTimeSeries(query)
355            }
356            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
357                SqlCommand::CreateQueue(query)
358            }
359            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
360                SqlCommand::AlterQueue(query)
361            }
362            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
363                SqlCommand::DropQueue(query)
364            }
365            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
366                SqlCommand::CreateTree(query)
367            }
368            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
369            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
370                SqlCommand::Probabilistic(command)
371            }
372            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
373                SqlCommand::SetConfig { key, value }
374            }
375            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
376                SqlCommand::ShowConfig { prefix }
377            }
378            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
379                SqlCommand::SetSecret { key, value }
380            }
381            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
382                SqlCommand::DeleteSecret { key }
383            }
384            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
385                SqlCommand::ShowSecrets { prefix }
386            }
387            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
388            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
389            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
390                SqlCommand::TransactionControl(ctl)
391            }
392            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
393            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
394            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
395            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
396                SqlCommand::CreateSequence(q)
397            }
398            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
399            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
400            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
401            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
402            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
403                SqlCommand::RefreshMaterializedView(q)
404            }
405            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
406            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
407            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
408            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
409            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
410                SqlCommand::CreateForeignTable(q)
411            }
412            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
413                SqlCommand::DropForeignTable(q)
414            }
415            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
416            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
417            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
418            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
419            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
420                SqlCommand::CreateMigration(q)
421            }
422            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
423                SqlCommand::ApplyMigration(q)
424            }
425            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
426                SqlCommand::RollbackMigration(q)
427            }
428            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
429                SqlCommand::ExplainMigration(q)
430            }
431        }
432    }
433
434    pub fn into_query_expr(self) -> QueryExpr {
435        self.into_command().into_query_expr()
436    }
437}
438
439impl FrontendStatement {
440    pub fn into_query_expr(self) -> QueryExpr {
441        match self {
442            FrontendStatement::Sql(statement) => statement.into_query_expr(),
443            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
444            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
445            FrontendStatement::Path(query) => QueryExpr::Path(query),
446            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
447            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
448            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
449            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
450            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
451            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
452            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
453            FrontendStatement::EventsBackfillStatus { collection } => {
454                QueryExpr::EventsBackfillStatus { collection }
455            }
456            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
457            FrontendStatement::ProbabilisticCommand(command) => {
458                QueryExpr::ProbabilisticCommand(command)
459            }
460            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
461            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
462            FrontendStatement::Ranking(expr) => expr,
463        }
464    }
465}
466
467pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
468    let mut parser = Parser::new(input)?;
469    let statement = parser.parse_frontend_statement()?;
470    if !parser.check(&Token::Eof) {
471        return Err(ParseError::new(
472            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
473            // Display arms emit raw user bytes. Render via `{:?}` so
474            // embedded CR/LF/NUL/quotes are escaped before the message
475            // reaches downstream JSON / audit / log / gRPC sinks.
476            format!("Unexpected token after query: {:?}", parser.current.token),
477            parser.position(),
478        ));
479    }
480    Ok(statement)
481}
482
483impl SqlCommand {
484    pub fn into_query_expr(self) -> QueryExpr {
485        match self {
486            SqlCommand::Select(query) => QueryExpr::Table(query),
487            SqlCommand::Join(query) => QueryExpr::Join(query),
488            SqlCommand::Insert(query) => QueryExpr::Insert(query),
489            SqlCommand::Update(query) => QueryExpr::Update(query),
490            SqlCommand::Delete(query) => QueryExpr::Delete(query),
491            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
492            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
493            SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
494            SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
495            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
496            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
497            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
498            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
499            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
500            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
501            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
502            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
503            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
504            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
505            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
506            SqlCommand::CreateMetric(query) => QueryExpr::CreateMetric(query),
507            SqlCommand::AlterMetric(query) => QueryExpr::AlterMetric(query),
508            SqlCommand::CreateSlo(query) => QueryExpr::CreateSlo(query),
509            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
510            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
511            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
512            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
513            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
514            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
515            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
516            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
517            SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
518            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
519            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
520            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
521            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
522            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
523            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
524            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
525            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
526            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
527            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
528            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
529            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
530            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
531            SqlCommand::DropView(q) => QueryExpr::DropView(q),
532            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
533            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
534            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
535            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
536            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
537            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
538            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
539            SqlCommand::Grant(s) => QueryExpr::Grant(s),
540            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
541            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
542            SqlCommand::IamPolicy(e) => e,
543            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
544            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
545            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
546            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
547        }
548    }
549
550    pub fn into_statement(self) -> SqlStatement {
551        match self {
552            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
553            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
554            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
555            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
556            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
557            SqlCommand::ExplainAlter(query) => {
558                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
559            }
560            SqlCommand::CreateTable(query) => {
561                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
562            }
563            SqlCommand::CreateCollection(query) => {
564                SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
565            }
566            SqlCommand::CreateVector(query) => {
567                SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
568            }
569            SqlCommand::DropTable(query) => {
570                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
571            }
572            SqlCommand::DropGraph(query) => {
573                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
574            }
575            SqlCommand::DropVector(query) => {
576                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
577            }
578            SqlCommand::DropDocument(query) => {
579                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
580            }
581            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
582            SqlCommand::DropCollection(query) => {
583                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
584            }
585            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
586            SqlCommand::AlterTable(query) => {
587                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
588            }
589            SqlCommand::CreateIndex(query) => {
590                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
591            }
592            SqlCommand::DropIndex(query) => {
593                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
594            }
595            SqlCommand::CreateTimeSeries(query) => {
596                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
597            }
598            SqlCommand::CreateMetric(query) => {
599                SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query))
600            }
601            SqlCommand::AlterMetric(query) => {
602                SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query))
603            }
604            SqlCommand::CreateSlo(query) => {
605                SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query))
606            }
607            SqlCommand::DropTimeSeries(query) => {
608                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
609            }
610            SqlCommand::CreateQueue(query) => {
611                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
612            }
613            SqlCommand::AlterQueue(query) => {
614                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
615            }
616            SqlCommand::DropQueue(query) => {
617                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
618            }
619            SqlCommand::CreateTree(query) => {
620                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
621            }
622            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
623            SqlCommand::Probabilistic(command) => {
624                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
625            }
626            SqlCommand::SetConfig { key, value } => {
627                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
628            }
629            SqlCommand::ShowConfig { prefix } => {
630                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
631            }
632            SqlCommand::SetSecret { key, value } => {
633                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
634            }
635            SqlCommand::DeleteSecret { key } => {
636                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
637            }
638            SqlCommand::ShowSecrets { prefix } => {
639                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
640            }
641            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
642            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
643            SqlCommand::TransactionControl(ctl) => {
644                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
645            }
646            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
647            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
648            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
649            SqlCommand::CreateSequence(q) => {
650                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
651            }
652            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
653            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
654            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
655            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
656            SqlCommand::RefreshMaterializedView(q) => {
657                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
658            }
659            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
660            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
661            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
662            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
663            SqlCommand::CreateForeignTable(q) => {
664                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
665            }
666            SqlCommand::DropForeignTable(q) => {
667                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
668            }
669            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
670            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
671            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
672            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
673            SqlCommand::CreateMigration(q) => {
674                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
675            }
676            SqlCommand::ApplyMigration(q) => {
677                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
678            }
679            SqlCommand::RollbackMigration(q) => {
680                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
681            }
682            SqlCommand::ExplainMigration(q) => {
683                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
684            }
685        }
686    }
687}
688
689impl<'a> Parser<'a> {
690    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
691        self.expect_ident()?; // EVENTS
692        if self.consume_ident_ci("STATUS")? {
693            let mut query = TableQuery::new("red.subscriptions");
694            let collection = match self.peek().clone() {
695                Token::Ident(name) => {
696                    self.advance()?;
697                    Some(name)
698                }
699                Token::String(name) => {
700                    self.advance()?;
701                    Some(name)
702                }
703                _ => None,
704            };
705            self.parse_table_clauses(&mut query)?;
706            if let Some(collection) = collection {
707                let filter = Filter::compare(
708                    FieldRef::column("red.subscriptions", "collection"),
709                    CompareOp::Eq,
710                    Value::text(collection),
711                );
712                let expr = filter_to_expr(&filter);
713                query.where_expr = Some(match query.where_expr.take() {
714                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
715                    None => expr,
716                });
717                query.filter = Some(match query.filter.take() {
718                    Some(existing) => existing.and(filter),
719                    None => filter,
720                });
721            }
722            return Ok(QueryExpr::Table(query));
723        }
724
725        if !self.consume_ident_ci("BACKFILL")? {
726            return Err(ParseError::expected(
727                vec!["BACKFILL", "STATUS"],
728                self.peek(),
729                self.position(),
730            ));
731        }
732
733        if self.consume_ident_ci("STATUS")? {
734            let collection = self.expect_ident()?;
735            return Ok(QueryExpr::EventsBackfillStatus { collection });
736        }
737
738        let collection = self.expect_ident()?;
739        let where_filter = if self.consume(&Token::Where)? {
740            let mut parts = Vec::new();
741            while !self.check(&Token::Eof) && !self.check(&Token::To) {
742                parts.push(self.peek().to_string());
743                self.advance()?;
744            }
745            if parts.is_empty() {
746                return Err(ParseError::expected(
747                    vec!["predicate"],
748                    self.peek(),
749                    self.position(),
750                ));
751            }
752            Some(parts.join(" "))
753        } else {
754            None
755        };
756
757        self.expect(Token::To)?;
758        let target_queue = self.expect_ident()?;
759        let limit = if self.consume(&Token::Limit)? {
760            Some(self.parse_positive_integer("LIMIT")? as u64)
761        } else {
762            None
763        };
764
765        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
766            collection,
767            where_filter,
768            target_queue,
769            limit,
770        }))
771    }
772
773    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
774    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
775    /// clause is absent. Values are always single-quoted string literals —
776    /// consistent with PG's generic-options model.
777    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
778        if !self.consume(&Token::Options)? {
779            return Ok(Vec::new());
780        }
781        self.expect(Token::LParen)?;
782        let mut out: Vec<(String, String)> = Vec::new();
783        loop {
784            // Option keys frequently collide with reserved words
785            // (`path`, `format`, `delimiter`, `header`, …) — accept
786            // the keyword form and lowercase it so downstream
787            // option-name matching stays case-insensitive.
788            let was_ident = matches!(self.peek(), Token::Ident(_));
789            let raw = self.expect_ident_or_keyword()?;
790            let key = if was_ident {
791                raw
792            } else {
793                raw.to_ascii_lowercase()
794            };
795            // Value is a single-quoted string literal.
796            let value = self.parse_string()?;
797            out.push((key, value));
798            if !self.consume(&Token::Comma)? {
799                break;
800            }
801        }
802        self.expect(Token::RParen)?;
803        Ok(out)
804    }
805
806    /// Parse any top-level frontend statement through a single shared surface.
807    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
808        match self.peek() {
809            Token::Select => match self.parse_select_query()? {
810                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
811                    SqlQuery::Select(query),
812                ))),
813                QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
814                    SqlQuery::Join(query),
815                ))),
816                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
817                other => Err(ParseError::new(
818                    format!("internal: SELECT produced unexpected query kind {other:?}"),
819                    self.position(),
820                )),
821            },
822            Token::From
823            | Token::Insert
824            | Token::Update
825            | Token::Truncate
826            | Token::Create
827            | Token::Drop
828            | Token::Alter
829            | Token::Set
830            | Token::Begin
831            | Token::Commit
832            | Token::Rollback
833            | Token::Savepoint
834            | Token::Release
835            | Token::Start
836            | Token::Vacuum
837            | Token::Analyze
838            | Token::Copy
839            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
840            Token::Explain => {
841                if matches!(
842                    self.peek_next()?,
843                    Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
844                ) {
845                    match self.parse_explain_ask_query()? {
846                        QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
847                        other => Err(ParseError::new(
848                            format!(
849                                "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
850                            ),
851                            self.position(),
852                        )),
853                    }
854                } else {
855                    self.parse_sql_statement().map(FrontendStatement::Sql)
856                }
857            }
858            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
859                self.parse_sql_statement().map(FrontendStatement::Sql)
860            }
861            Token::Ident(name)
862                if name.eq_ignore_ascii_case("RANK")
863                    || name.eq_ignore_ascii_case("APPROX")
864                    || name.eq_ignore_ascii_case("APPROXIMATE")
865                    || name.eq_ignore_ascii_case("ZRANK")
866                    || name.eq_ignore_ascii_case("ZRANGE") =>
867            {
868                self.parse_ranking_read().map(FrontendStatement::Ranking)
869            }
870            Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
871            Token::Ident(name)
872                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
873            {
874                self.parse_sql_statement().map(FrontendStatement::Sql)
875            }
876            Token::Ident(name)
877                if name.eq_ignore_ascii_case("GRANT")
878                    || name.eq_ignore_ascii_case("REVOKE")
879                    || name.eq_ignore_ascii_case("SIMULATE")
880                    || name.eq_ignore_ascii_case("LINT")
881                    || name.eq_ignore_ascii_case("MIGRATE")
882                    || name.eq_ignore_ascii_case("APPLY") =>
883            {
884                self.parse_sql_statement().map(FrontendStatement::Sql)
885            }
886            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
887                self.advance()?;
888                if matches!(
889                    self.peek(),
890                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
891                ) {
892                    match self.parse_config_watch_after_watch()? {
893                        QueryExpr::ConfigCommand(command) => {
894                            Ok(FrontendStatement::ConfigCommand(command))
895                        }
896                        other => Err(ParseError::new(
897                            format!(
898                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
899                            ),
900                            self.position(),
901                        )),
902                    }
903                } else if matches!(
904                    self.peek(),
905                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
906                ) {
907                    match self.parse_vault_watch_after_watch()? {
908                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
909                        other => Err(ParseError::new(
910                            format!(
911                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
912                            ),
913                            self.position(),
914                        )),
915                    }
916                } else {
917                    match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
918                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
919                        other => Err(ParseError::new(
920                            format!("internal: WATCH produced unexpected query kind {other:?}"),
921                            self.position(),
922                        )),
923                    }
924                }
925            }
926            Token::List => {
927                self.advance()?;
928                if matches!(
929                    self.peek(),
930                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
931                ) {
932                    match self.parse_config_list_after_list()? {
933                        QueryExpr::ConfigCommand(command) => {
934                            Ok(FrontendStatement::ConfigCommand(command))
935                        }
936                        other => Err(ParseError::new(
937                            format!(
938                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
939                            ),
940                            self.position(),
941                        )),
942                    }
943                } else if matches!(
944                    self.peek(),
945                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
946                ) {
947                    match self.parse_vault_list_after_list()? {
948                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
949                        other => Err(ParseError::new(
950                            format!(
951                                "internal: LIST VAULT produced unexpected query kind {other:?}"
952                            ),
953                            self.position(),
954                        )),
955                    }
956                } else {
957                    Err(ParseError::expected(
958                        vec!["CONFIG", "VAULT"],
959                        self.peek(),
960                        self.position(),
961                    ))
962                }
963            }
964            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
965                self.advance()?;
966                if matches!(
967                    self.peek(),
968                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
969                ) {
970                    match self.parse_config_list_after_list()? {
971                        QueryExpr::ConfigCommand(command) => {
972                            Ok(FrontendStatement::ConfigCommand(command))
973                        }
974                        other => Err(ParseError::new(
975                            format!(
976                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
977                            ),
978                            self.position(),
979                        )),
980                    }
981                } else if matches!(
982                    self.peek(),
983                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
984                ) {
985                    match self.parse_vault_list_after_list()? {
986                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
987                        other => Err(ParseError::new(
988                            format!(
989                                "internal: LIST VAULT produced unexpected query kind {other:?}"
990                            ),
991                            self.position(),
992                        )),
993                    }
994                } else {
995                    Err(ParseError::expected(
996                        vec!["CONFIG", "VAULT"],
997                        self.peek(),
998                        self.position(),
999                    ))
1000                }
1001            }
1002            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
1003                if matches!(
1004                    self.peek_next()?,
1005                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
1006                ) {
1007                    match self.parse_config_command()? {
1008                        QueryExpr::ConfigCommand(command) => {
1009                            Ok(FrontendStatement::ConfigCommand(command))
1010                        }
1011                        other => Err(ParseError::new(
1012                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1013                            self.position(),
1014                        )),
1015                    }
1016                } else {
1017                    self.advance()?;
1018                    match self.parse_kv_invalidate_tags_after_invalidate()? {
1019                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1020                        other => Err(ParseError::new(
1021                            format!(
1022                                "internal: INVALIDATE produced unexpected query kind {other:?}"
1023                            ),
1024                            self.position(),
1025                        )),
1026                    }
1027                }
1028            }
1029            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
1030            Token::Match => match self.parse_match_query()? {
1031                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
1032                other => Err(ParseError::new(
1033                    format!("internal: MATCH produced unexpected query kind {other:?}"),
1034                    self.position(),
1035                )),
1036            },
1037            Token::Path => match self.parse_path_query()? {
1038                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
1039                other => Err(ParseError::new(
1040                    format!("internal: PATH produced unexpected query kind {other:?}"),
1041                    self.position(),
1042                )),
1043            },
1044            Token::Vector => match self.parse_vector_query()? {
1045                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
1046                other => Err(ParseError::new(
1047                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
1048                    self.position(),
1049                )),
1050            },
1051            Token::Hybrid => match self.parse_hybrid_query()? {
1052                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
1053                other => Err(ParseError::new(
1054                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
1055                    self.position(),
1056                )),
1057            },
1058            Token::Graph => match self.parse_graph_command()? {
1059                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
1060                other => Err(ParseError::new(
1061                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
1062                    self.position(),
1063                )),
1064            },
1065            Token::Search => match self.parse_search_command()? {
1066                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
1067                other => Err(ParseError::new(
1068                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
1069                    self.position(),
1070                )),
1071            },
1072            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
1073                match self.parse_ask_query()? {
1074                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
1075                    other => Err(ParseError::new(
1076                        format!("internal: ASK produced unexpected query kind {other:?}"),
1077                        self.position(),
1078                    )),
1079                }
1080            }
1081            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
1082                match self.parse_unseal_vault_command()? {
1083                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1084                    other => Err(ParseError::new(
1085                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
1086                        self.position(),
1087                    )),
1088                }
1089            }
1090            Token::Queue => match self.parse_queue_command()? {
1091                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
1092                other => Err(ParseError::new(
1093                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
1094                    self.position(),
1095                )),
1096            },
1097            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1098                match self.parse_events_command()? {
1099                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1100                        SqlQuery::Select(query),
1101                    ))),
1102                    QueryExpr::EventsBackfill(query) => {
1103                        Ok(FrontendStatement::EventsBackfill(query))
1104                    }
1105                    QueryExpr::EventsBackfillStatus { collection } => {
1106                        Ok(FrontendStatement::EventsBackfillStatus { collection })
1107                    }
1108                    other => Err(ParseError::new(
1109                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
1110                        self.position(),
1111                    )),
1112                }
1113            }
1114            Token::Kv => match self.parse_kv_command()? {
1115                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1116                other => Err(ParseError::new(
1117                    format!("internal: KV produced unexpected query kind {other:?}"),
1118                    self.position(),
1119                )),
1120            },
1121            Token::Delete => {
1122                if matches!(
1123                    self.peek_next()?,
1124                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1125                ) {
1126                    match self.parse_config_command()? {
1127                        QueryExpr::ConfigCommand(command) => {
1128                            Ok(FrontendStatement::ConfigCommand(command))
1129                        }
1130                        other => Err(ParseError::new(
1131                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1132                            self.position(),
1133                        )),
1134                    }
1135                } else if matches!(
1136                    self.peek_next()?,
1137                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1138                ) {
1139                    match self.parse_vault_lifecycle_command()? {
1140                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1141                        other => Err(ParseError::new(
1142                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1143                            self.position(),
1144                        )),
1145                    }
1146                } else {
1147                    self.parse_sql_statement().map(FrontendStatement::Sql)
1148                }
1149            }
1150            Token::Add => match self.parse_config_command()? {
1151                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1152                other => Err(ParseError::new(
1153                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
1154                    self.position(),
1155                )),
1156            },
1157            Token::Purge => match self.parse_vault_lifecycle_command()? {
1158                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1159                other => Err(ParseError::new(
1160                    format!("internal: VAULT produced unexpected query kind {other:?}"),
1161                    self.position(),
1162                )),
1163            },
1164            Token::Ident(name)
1165                if name.eq_ignore_ascii_case("PUT")
1166                    || name.eq_ignore_ascii_case("GET")
1167                    || name.eq_ignore_ascii_case("RESOLVE")
1168                    || name.eq_ignore_ascii_case("ROTATE")
1169                    || name.eq_ignore_ascii_case("HISTORY")
1170                    || name.eq_ignore_ascii_case("PURGE")
1171                    || name.eq_ignore_ascii_case("INCR")
1172                    || name.eq_ignore_ascii_case("DECR")
1173                    || name.eq_ignore_ascii_case("INVALIDATE") =>
1174            {
1175                if matches!(
1176                    self.peek_next()?,
1177                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1178                ) {
1179                    match self.parse_vault_lifecycle_command()? {
1180                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1181                        other => Err(ParseError::new(
1182                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1183                            self.position(),
1184                        )),
1185                    }
1186                } else {
1187                    match self.parse_config_command()? {
1188                        QueryExpr::ConfigCommand(command) => {
1189                            Ok(FrontendStatement::ConfigCommand(command))
1190                        }
1191                        other => Err(ParseError::new(
1192                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1193                            self.position(),
1194                        )),
1195                    }
1196                }
1197            }
1198            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1199                match self.parse_vault_command()? {
1200                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1201                    other => Err(ParseError::new(
1202                        format!("internal: VAULT produced unexpected query kind {other:?}"),
1203                        self.position(),
1204                    )),
1205                }
1206            }
1207            Token::Tree => match self.parse_tree_command()? {
1208                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1209                other => Err(ParseError::new(
1210                    format!("internal: TREE produced unexpected query kind {other:?}"),
1211                    self.position(),
1212                )),
1213            },
1214            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1215                match self.parse_hll_command()? {
1216                    QueryExpr::ProbabilisticCommand(command) => {
1217                        Ok(FrontendStatement::ProbabilisticCommand(command))
1218                    }
1219                    other => Err(ParseError::new(
1220                        format!("internal: HLL produced unexpected query kind {other:?}"),
1221                        self.position(),
1222                    )),
1223                }
1224            }
1225            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1226                match self.parse_sketch_command()? {
1227                    QueryExpr::ProbabilisticCommand(command) => {
1228                        Ok(FrontendStatement::ProbabilisticCommand(command))
1229                    }
1230                    other => Err(ParseError::new(
1231                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
1232                        self.position(),
1233                    )),
1234                }
1235            }
1236            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1237                match self.parse_filter_command()? {
1238                    QueryExpr::ProbabilisticCommand(command) => {
1239                        Ok(FrontendStatement::ProbabilisticCommand(command))
1240                    }
1241                    other => Err(ParseError::new(
1242                        format!("internal: FILTER produced unexpected query kind {other:?}"),
1243                        self.position(),
1244                    )),
1245                }
1246            }
1247            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1248                .parse_sql_command()
1249                .map(SqlCommand::into_statement)
1250                .map(FrontendStatement::Sql),
1251            other => Err(ParseError::expected(
1252                vec![
1253                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1254                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1255                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1256                    "DESCRIBE", "DESC", "RANK", "ZRANK", "ZRANGE",
1257                ],
1258                other,
1259                self.position(),
1260            )),
1261        }
1262    }
1263
1264    fn parse_ranking_read(&mut self) -> Result<QueryExpr, ParseError> {
1265        let head = self.expect_ident()?;
1266        if head.eq_ignore_ascii_case("RANK") {
1267            return self.parse_rank_after_rank(false);
1268        }
1269        if head.eq_ignore_ascii_case("APPROX") || head.eq_ignore_ascii_case("APPROXIMATE") {
1270            if !self.consume_ident_ci("RANK")? {
1271                return Err(ParseError::expected(
1272                    vec!["RANK"],
1273                    self.peek(),
1274                    self.position(),
1275                ));
1276            }
1277            return self.parse_rank_after_rank(true);
1278        }
1279        if head.eq_ignore_ascii_case("ZRANK") {
1280            return self.parse_zrank();
1281        }
1282        if head.eq_ignore_ascii_case("ZRANGE") {
1283            return self.parse_zrange();
1284        }
1285        Err(ParseError::expected(
1286            vec!["RANK", "APPROX RANK", "ZRANK", "ZRANGE"],
1287            self.peek(),
1288            self.position(),
1289        ))
1290    }
1291
1292    fn parse_rank_after_rank(&mut self, approximate: bool) -> Result<QueryExpr, ParseError> {
1293        if self.consume(&Token::Of)? {
1294            let entity_id = self.parse_u64_slot("rank entity id")?;
1295            self.expect(Token::In)?;
1296            let ranking = self.expect_ident()?;
1297            let query = RankOfQuery { ranking, entity_id };
1298            return Ok(if approximate {
1299                QueryExpr::ApproxRankOf(query)
1300            } else {
1301                QueryExpr::RankOf(query)
1302            });
1303        }
1304
1305        if !approximate && self.consume(&Token::Range)? {
1306            let lo = self.parse_positive_u64_slot("rank range lower bound")?;
1307            self.expect(Token::To)?;
1308            let hi = self.parse_positive_u64_slot("rank range upper bound")?;
1309            if hi < lo {
1310                return Err(ParseError::value_out_of_range(
1311                    "rank range upper bound",
1312                    "must be greater than or equal to the lower bound",
1313                    self.position(),
1314                ));
1315            }
1316            self.expect(Token::In)?;
1317            let ranking = self.expect_ident()?;
1318            return Ok(QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi }));
1319        }
1320
1321        Err(ParseError::expected(
1322            if approximate {
1323                vec!["OF"]
1324            } else {
1325                vec!["OF", "RANGE"]
1326            },
1327            self.peek(),
1328            self.position(),
1329        ))
1330    }
1331
1332    fn parse_zrank(&mut self) -> Result<QueryExpr, ParseError> {
1333        let ranking = self.expect_ident()?;
1334        let entity_id = self.parse_u64_slot("ZRANK entity id")?;
1335        Ok(QueryExpr::RankOf(RankOfQuery { ranking, entity_id }))
1336    }
1337
1338    fn parse_zrange(&mut self) -> Result<QueryExpr, ParseError> {
1339        let ranking = self.expect_ident()?;
1340        let start = self.parse_u64_slot("ZRANGE start")?;
1341        let stop = self.parse_u64_slot("ZRANGE stop")?;
1342        if stop < start {
1343            return Err(ParseError::value_out_of_range(
1344                "ZRANGE stop",
1345                "must be greater than or equal to start",
1346                self.position(),
1347            ));
1348        }
1349        let _with_scores = self.consume_ident_ci("WITHSCORES")?;
1350        Ok(QueryExpr::RankRange(RankRangeQuery {
1351            ranking,
1352            lo: start + 1,
1353            hi: stop + 1,
1354        }))
1355    }
1356
1357    fn parse_positive_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
1358        let value = self.parse_u64_slot(field)?;
1359        if value == 0 {
1360            return Err(ParseError::value_out_of_range(
1361                field,
1362                "must be a positive integer",
1363                self.position(),
1364            ));
1365        }
1366        Ok(value)
1367    }
1368
1369    fn parse_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
1370        let pos = self.position();
1371        if matches!(self.peek(), Token::Minus | Token::Dash) {
1372            return Err(ParseError::value_out_of_range(
1373                field,
1374                "must be an unsigned integer",
1375                pos,
1376            ));
1377        }
1378        let raw = self.parse_integer()?;
1379        u64::try_from(raw)
1380            .map_err(|_| ParseError::value_out_of_range(field, "must be an unsigned integer", pos))
1381    }
1382
1383    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
1384    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1385        self.parse_sql_command().map(SqlCommand::into_statement)
1386    }
1387
1388    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1389        let mut path = self.expect_ident()?;
1390        while self.consume(&Token::Dot)? {
1391            let next = self.expect_ident_or_keyword()?;
1392            path = format!("{path}.{next}");
1393        }
1394        Ok(if lowercase {
1395            path.to_ascii_lowercase()
1396        } else {
1397            path
1398        })
1399    }
1400
1401    /// Parse any SQL/RQL-style command through a single frontend module.
1402    /// Parse a `CREATE ...` statement. Split out of
1403    /// [`parse_sql_command`] so its very large per-arm locals
1404    /// (every CREATE variant's query struct) live in their own
1405    /// stack frame instead of inflating the dispatcher's frame.
1406    /// `parse_sql_command` recurses (CREATE VIEW ... AS <stmt>,
1407    /// nested subqueries), so a fat dispatcher frame stacked on
1408    /// itself overflowed small (2 MiB) worker-thread stacks (#635).
1409    #[inline(never)]
1410    fn parse_create_command(&mut self) -> Result<SqlCommand, ParseError> {
1411        let pos = self.position();
1412        self.advance()?;
1413
1414        // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
1415        // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
1416        // don't collide with other CREATE variants (TABLE, INDEX, etc.).
1417        let mut or_replace = false;
1418        if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1419            let _ = self.consume_ident_ci("REPLACE")?;
1420            or_replace = true;
1421        }
1422        let materialized = self.consume(&Token::Materialized)?;
1423        if self.check(&Token::View) {
1424            self.advance()?;
1425            let if_not_exists = self.match_if_not_exists()?;
1426            let name = self.expect_ident()?;
1427            // Issue #584 slice 12 — `WITH RETENTION <duration>`
1428            // on CREATE MATERIALIZED VIEW. Parsed before `AS`
1429            // so the SELECT body parser cannot consume the
1430            // trailing `WITH` for its own (TTL / METADATA /
1431            // …) clauses. Persisted on the view definition;
1432            // the physical sweep against view-backing rows
1433            // activates with the slice-9 row-storage follow-up.
1434            let mut retention_duration_ms: Option<u64> = None;
1435            if self.check(&Token::With) {
1436                self.advance()?;
1437                if !self.consume(&Token::Retention)? && !self.consume_ident_ci("RETENTION")? {
1438                    return Err(ParseError::expected(
1439                        vec!["RETENTION"],
1440                        self.peek(),
1441                        self.position(),
1442                    ));
1443                }
1444                if !materialized {
1445                    return Err(ParseError::new(
1446                        "WITH RETENTION is only valid on \
1447                                 CREATE MATERIALIZED VIEW"
1448                            .to_string(),
1449                        self.position(),
1450                    ));
1451                }
1452                let value = self.parse_float()?;
1453                let unit_mult = self.parse_duration_unit()?;
1454                retention_duration_ms = Some((value * unit_mult).round() as u64);
1455            }
1456            // Accept `AS` — the lexer promotes it to `Token::As`
1457            // (keyword) but some paths still see it as an ident.
1458            if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1459                return Err(ParseError::expected(
1460                    vec!["AS"],
1461                    self.peek(),
1462                    self.position(),
1463                ));
1464            }
1465            // Recursive parse of the body. Any QueryExpr that the
1466            // rest of the grammar accepts is valid (Select, Join, etc.).
1467            let body = self.parse_sql_command()?.into_query_expr();
1468            // Optional `REFRESH EVERY <duration>` clause on
1469            // materialized views (issue #583 slice 10). The
1470            // background scheduler reads this off the view
1471            // descriptor and ticks the view on its cadence.
1472            let mut refresh_every_ms: Option<u64> = None;
1473            if self.check(&Token::Refresh) {
1474                if !materialized {
1475                    return Err(ParseError::new(
1476                        "REFRESH EVERY is only valid on \
1477                                 CREATE MATERIALIZED VIEW"
1478                            .to_string(),
1479                        self.position(),
1480                    ));
1481                }
1482                self.advance()?;
1483                if !self.consume_ident_ci("EVERY")? {
1484                    return Err(ParseError::expected(
1485                        vec!["EVERY"],
1486                        self.peek(),
1487                        self.position(),
1488                    ));
1489                }
1490                let value = self.parse_float()?;
1491                let unit_mult = self.parse_duration_unit()?;
1492                refresh_every_ms = Some((value * unit_mult).round() as u64);
1493            }
1494            return Ok(SqlCommand::CreateView(CreateViewQuery {
1495                name,
1496                query: Box::new(body),
1497                materialized,
1498                if_not_exists,
1499                or_replace,
1500                refresh_every_ms,
1501                retention_duration_ms,
1502            }));
1503        }
1504        // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
1505        // bail out — no other CREATE form accepts those modifiers.
1506        if or_replace || materialized {
1507            return Err(ParseError::expected(
1508                vec!["VIEW"],
1509                self.peek(),
1510                self.position(),
1511            ));
1512        }
1513
1514        if self.check(&Token::Index) || self.check(&Token::Unique) {
1515            match self.parse_create_index_query()? {
1516                QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1517                other => Err(ParseError::new(
1518                    format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1519                    self.position(),
1520                )),
1521            }
1522        } else if self.check(&Token::Table) {
1523            self.expect(Token::Table)?;
1524            match self.parse_create_table_body()? {
1525                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1526                other => Err(ParseError::new(
1527                    format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1528                    self.position(),
1529                )),
1530            }
1531        } else if self.check(&Token::Graph) {
1532            self.advance()?;
1533            match self.parse_create_collection_model_body(CollectionModel::Graph)? {
1534                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1535                other => Err(ParseError::new(
1536                    format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
1537                    self.position(),
1538                )),
1539            }
1540        } else if self.check(&Token::Document) {
1541            self.advance()?;
1542            match self.parse_create_collection_model_body(CollectionModel::Document)? {
1543                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1544                other => Err(ParseError::new(
1545                    format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
1546                    self.position(),
1547                )),
1548            }
1549        } else if self.check(&Token::Vector) {
1550            self.advance()?;
1551            match self.parse_create_vector_body()? {
1552                QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
1553                other => Err(ParseError::new(
1554                    format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
1555                    self.position(),
1556                )),
1557            }
1558        } else if self.check(&Token::Collection) {
1559            self.advance()?;
1560            match self.parse_create_collection_body()? {
1561                QueryExpr::CreateCollection(query) => Ok(SqlCommand::CreateCollection(query)),
1562                other => Err(ParseError::new(
1563                    format!("internal: CREATE COLLECTION produced unexpected kind {other:?}"),
1564                    self.position(),
1565                )),
1566            }
1567        } else if self.check(&Token::Kv) {
1568            self.advance()?;
1569            match self.parse_create_keyed_body(CollectionModel::Kv)? {
1570                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1571                other => Err(ParseError::new(
1572                    format!("internal: CREATE KV produced unexpected kind {other:?}"),
1573                    self.position(),
1574                )),
1575            }
1576        } else if self.consume_ident_ci("CONFIG")? {
1577            match self.parse_create_keyed_body(CollectionModel::Config)? {
1578                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1579                other => Err(ParseError::new(
1580                    format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1581                    self.position(),
1582                )),
1583            }
1584        } else if self.consume_ident_ci("VAULT")? {
1585            match self.parse_create_keyed_body(CollectionModel::Vault)? {
1586                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1587                other => Err(ParseError::new(
1588                    format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1589                    self.position(),
1590                )),
1591            }
1592        } else if self.check(&Token::Timeseries) {
1593            self.advance()?;
1594            match self.parse_create_timeseries_body()? {
1595                QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
1596                other => Err(ParseError::new(
1597                    format!("internal: CREATE TIMESERIES produced unexpected kind {other:?}"),
1598                    self.position(),
1599                )),
1600            }
1601        } else if self.check(&Token::Metric) {
1602            self.advance()?;
1603            match self.parse_create_metric_body()? {
1604                QueryExpr::CreateMetric(query) => Ok(SqlCommand::CreateMetric(query)),
1605                other => Err(ParseError::new(
1606                    format!("internal: CREATE METRIC produced unexpected kind {other:?}"),
1607                    self.position(),
1608                )),
1609            }
1610        } else if self.consume_ident_ci("METRICS")? {
1611            match self.parse_create_metrics_body()? {
1612                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1613                other => Err(ParseError::new(
1614                    format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
1615                    self.position(),
1616                )),
1617            }
1618        } else if self.consume_ident_ci("SLO")? {
1619            match self.parse_create_slo_body()? {
1620                QueryExpr::CreateSlo(query) => Ok(SqlCommand::CreateSlo(query)),
1621                other => Err(ParseError::new(
1622                    format!("internal: CREATE SLO produced unexpected kind {other:?}"),
1623                    self.position(),
1624                )),
1625            }
1626        } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE")) {
1627            self.advance()?;
1628            match self.parse_create_hypertable_body()? {
1629                QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
1630                other => Err(ParseError::new(
1631                    format!("internal: CREATE HYPERTABLE produced unexpected kind {other:?}"),
1632                    self.position(),
1633                )),
1634            }
1635        } else if self.check(&Token::Queue) {
1636            self.advance()?;
1637            match self.parse_create_queue_body()? {
1638                QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1639                other => Err(ParseError::new(
1640                    format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1641                    self.position(),
1642                )),
1643            }
1644        } else if self.check(&Token::Tree) {
1645            self.advance()?;
1646            match self.parse_create_tree_body()? {
1647                QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1648                other => Err(ParseError::new(
1649                    format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1650                    self.position(),
1651                )),
1652            }
1653        } else if matches!(self.peek(), Token::Ident(n) if
1654                    n.eq_ignore_ascii_case("HLL") ||
1655                    n.eq_ignore_ascii_case("SKETCH") ||
1656                    n.eq_ignore_ascii_case("FILTER"))
1657        {
1658            match self.parse_create_probabilistic()? {
1659                QueryExpr::ProbabilisticCommand(command) => Ok(SqlCommand::Probabilistic(command)),
1660                other => Err(ParseError::new(
1661                    format!("internal: CREATE probabilistic produced unexpected kind {other:?}"),
1662                    self.position(),
1663                )),
1664            }
1665        } else if self.check(&Token::Schema) {
1666            // CREATE SCHEMA [IF NOT EXISTS] name
1667            self.advance()?;
1668            let if_not_exists = self.match_if_not_exists()?;
1669            let name = self.expect_ident()?;
1670            Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1671                name,
1672                if_not_exists,
1673            }))
1674        } else if self.check(&Token::Policy) {
1675            // Two forms share the leading `CREATE POLICY` tokens:
1676            //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
1677            //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
1678            // Disambiguate by peeking the token after POLICY.
1679            self.advance()?;
1680            if matches!(self.peek(), Token::String(_)) {
1681                // IAM form — short-circuit out of the SQL command stack.
1682                let expr = self.parse_create_iam_policy_after_keywords()?;
1683                // Inline command-wrapping: produce a synthetic SqlCommand by
1684                // routing through a generic IAM admin holder. We don't
1685                // have a dedicated SqlCommand variant for IAM yet, so we
1686                // bounce through the existing Grant-shaped Admin slot
1687                // which expects no further tokens.
1688                return Ok(SqlCommand::IamPolicy(expr));
1689            }
1690            let name = self.expect_ident()?;
1691            self.expect(Token::On)?;
1692
1693            let (target_kind, table) = {
1694                use crate::storage::query::ast::PolicyTargetKind;
1695                let kw = match self.peek() {
1696                    Token::Ident(s) => Some(s.to_ascii_uppercase()),
1697                    _ => None,
1698                };
1699                let kind = kw.as_deref().and_then(|k| match k {
1700                    "NODES" => Some(PolicyTargetKind::Nodes),
1701                    "EDGES" => Some(PolicyTargetKind::Edges),
1702                    "VECTORS" => Some(PolicyTargetKind::Vectors),
1703                    "MESSAGES" => Some(PolicyTargetKind::Messages),
1704                    "POINTS" => Some(PolicyTargetKind::Points),
1705                    "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1706                    _ => None,
1707                });
1708                if let Some(k) = kind {
1709                    self.advance()?;
1710                    self.expect(Token::Of)?;
1711                    let coll = self.expect_ident()?;
1712                    (k, coll)
1713                } else {
1714                    let coll = self.expect_ident()?;
1715                    (PolicyTargetKind::Table, coll)
1716                }
1717            };
1718
1719            let action = if self.consume(&Token::For)? {
1720                let a = match self.peek() {
1721                    Token::Select => {
1722                        self.advance()?;
1723                        Some(PolicyAction::Select)
1724                    }
1725                    Token::Insert => {
1726                        self.advance()?;
1727                        Some(PolicyAction::Insert)
1728                    }
1729                    Token::Update => {
1730                        self.advance()?;
1731                        Some(PolicyAction::Update)
1732                    }
1733                    Token::Delete => {
1734                        self.advance()?;
1735                        Some(PolicyAction::Delete)
1736                    }
1737                    Token::All => {
1738                        self.advance()?;
1739                        None
1740                    }
1741                    _ => None,
1742                };
1743                a
1744            } else {
1745                None
1746            };
1747
1748            let role = if self.consume(&Token::To)? {
1749                Some(self.expect_ident()?)
1750            } else {
1751                None
1752            };
1753
1754            self.expect(Token::Using)?;
1755            self.expect(Token::LParen)?;
1756            let filter = self.parse_filter()?;
1757            self.expect(Token::RParen)?;
1758
1759            Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1760                name,
1761                table,
1762                action,
1763                role,
1764                using: Box::new(filter),
1765                target_kind,
1766            }))
1767        } else if self.check(&Token::Server) {
1768            // CREATE SERVER [IF NOT EXISTS] name
1769            //   FOREIGN DATA WRAPPER kind
1770            //   [OPTIONS (key 'value', ...)]
1771            self.advance()?;
1772            let if_not_exists = self.match_if_not_exists()?;
1773            let name = self.expect_ident()?;
1774            self.expect(Token::Foreign)?;
1775            self.expect(Token::Data)?;
1776            self.expect(Token::Wrapper)?;
1777            let wrapper = self.expect_ident()?;
1778            let options = self.parse_fdw_options_clause()?;
1779            Ok(SqlCommand::CreateServer(CreateServerQuery {
1780                name,
1781                wrapper,
1782                options,
1783                if_not_exists,
1784            }))
1785        } else if self.check(&Token::Foreign) {
1786            // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
1787            //   SERVER server_name
1788            //   [OPTIONS (key 'value', ...)]
1789            self.advance()?;
1790            self.expect(Token::Table)?;
1791            let if_not_exists = self.match_if_not_exists()?;
1792            let name = self.expect_ident()?;
1793            self.expect(Token::LParen)?;
1794            let mut columns = Vec::new();
1795            loop {
1796                let col_name = self.expect_ident()?;
1797                let data_type = self.expect_ident_or_keyword()?;
1798                // Inline NOT NULL check — the CREATE TABLE path's helper is
1799                // private and coupling to it just for FDW columns isn't worth it.
1800                let mut not_null = false;
1801                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1802                    self.advance()?;
1803                    if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL")) {
1804                        self.advance()?;
1805                        not_null = true;
1806                    }
1807                }
1808                columns.push(ForeignColumnDef {
1809                    name: col_name,
1810                    data_type,
1811                    not_null,
1812                });
1813                if !self.consume(&Token::Comma)? {
1814                    break;
1815                }
1816            }
1817            self.expect(Token::RParen)?;
1818            self.expect(Token::Server)?;
1819            let server = self.expect_ident()?;
1820            let options = self.parse_fdw_options_clause()?;
1821            Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1822                name,
1823                server,
1824                columns,
1825                options,
1826                if_not_exists,
1827            }))
1828        } else if self.check(&Token::Sequence) {
1829            // CREATE SEQUENCE [IF NOT EXISTS] name
1830            //   [START [WITH] n] [INCREMENT [BY] n]
1831            self.advance()?;
1832            let if_not_exists = self.match_if_not_exists()?;
1833            let name = self.expect_ident()?;
1834            let mut start: i64 = 1;
1835            let mut increment: i64 = 1;
1836            // Loop over optional clauses in any order.
1837            loop {
1838                if self.consume(&Token::Start)? {
1839                    // Accept `START 100` or `START WITH 100`.
1840                    let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1841                    start = self.parse_integer()?;
1842                } else if self.consume(&Token::Increment)? {
1843                    // Accept `INCREMENT 5` or `INCREMENT BY 5`.
1844                    let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1845                    increment = self.parse_integer()?;
1846                } else {
1847                    break;
1848                }
1849            }
1850            Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1851                name,
1852                if_not_exists,
1853                start,
1854                increment,
1855            }))
1856        } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
1857            self.advance()?; // consume MIGRATION
1858            match self.parse_create_migration_body()? {
1859                QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1860                other => Err(ParseError::new(
1861                    format!("internal: CREATE MIGRATION produced unexpected kind {other:?}"),
1862                    self.position(),
1863                )),
1864            }
1865        } else if let Some(reason) = analytics_v0_non_goal_create(self.peek()) {
1866            // Issue #789 — enforce Analytics v0 non-goals at the parser
1867            // surface. The parent PRD (#782) explicitly excludes generic
1868            // analytics objects, a new event storage model, cohorts,
1869            // funnels, SLA contracts, and adapters from v0. Reject these
1870            // CREATE forms here with a stable, non-goal-specific message
1871            // so accidental use surfaces an obvious "out of scope for v0"
1872            // error rather than the generic CREATE fallback.
1873            Err(ParseError::new(reason, self.position()))
1874        } else if let Some(err) =
1875            ParseError::unsupported_recognized_token(self.peek(), self.position())
1876        {
1877            Err(err)
1878        } else {
1879            Err(ParseError::expected(
1880                vec![
1881                    "TABLE",
1882                    "GRAPH",
1883                    "VECTOR",
1884                    "DOCUMENT",
1885                    "KV",
1886                    "COLLECTION",
1887                    "INDEX",
1888                    "UNIQUE",
1889                    "METRIC",
1890                    "TIMESERIES",
1891                    "QUEUE",
1892                    "TREE",
1893                    "HLL",
1894                    "SKETCH",
1895                    "FILTER",
1896                    "SCHEMA",
1897                    "SEQUENCE",
1898                    "MIGRATION",
1899                ],
1900                self.peek(),
1901                pos,
1902            ))
1903        }
1904    }
1905
1906    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1907        match self.peek() {
1908            Token::Select => match self.parse_select_query()? {
1909                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1910                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1911                other => Err(ParseError::new(
1912                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1913                    self.position(),
1914                )),
1915            },
1916            Token::From => match self.parse_from_query()? {
1917                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1918                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1919                other => Err(ParseError::new(
1920                    format!("internal: FROM produced unexpected query kind {other:?}"),
1921                    self.position(),
1922                )),
1923            },
1924            Token::Insert => match self.parse_insert_query()? {
1925                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1926                other => Err(ParseError::new(
1927                    format!("internal: INSERT produced unexpected query kind {other:?}"),
1928                    self.position(),
1929                )),
1930            },
1931            Token::Update => match self.parse_update_query()? {
1932                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1933                other => Err(ParseError::new(
1934                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
1935                    self.position(),
1936                )),
1937            },
1938            Token::Delete => {
1939                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1940                {
1941                    self.advance()?; // DELETE
1942                    self.advance()?; // SECRET
1943                    let key = self.parse_dotted_admin_path(true)?;
1944                    Ok(SqlCommand::DeleteSecret { key })
1945                } else {
1946                    match self.parse_delete_query()? {
1947                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1948                        other => Err(ParseError::new(
1949                            format!("internal: DELETE produced unexpected query kind {other:?}"),
1950                            self.position(),
1951                        )),
1952                    }
1953                }
1954            }
1955            Token::Truncate => {
1956                self.advance()?;
1957                let model = if self.consume(&Token::Table)? {
1958                    Some(CollectionModel::Table)
1959                } else if self.consume(&Token::Graph)? {
1960                    Some(CollectionModel::Graph)
1961                } else if self.consume(&Token::Vector)? {
1962                    Some(CollectionModel::Vector)
1963                } else if self.consume(&Token::Document)? {
1964                    Some(CollectionModel::Document)
1965                } else if self.consume(&Token::Timeseries)? {
1966                    Some(CollectionModel::TimeSeries)
1967                } else if self.consume_ident_ci("METRICS")? {
1968                    Some(CollectionModel::Metrics)
1969                } else if self.consume(&Token::Kv)? {
1970                    Some(CollectionModel::Kv)
1971                } else if self.consume(&Token::Queue)? {
1972                    Some(CollectionModel::Queue)
1973                } else if self.consume(&Token::Collection)? {
1974                    None
1975                } else {
1976                    return Err(ParseError::expected(
1977                        vec![
1978                            "TABLE",
1979                            "GRAPH",
1980                            "VECTOR",
1981                            "DOCUMENT",
1982                            "TIMESERIES",
1983                            "METRICS",
1984                            "KV",
1985                            "QUEUE",
1986                            "COLLECTION",
1987                        ],
1988                        self.peek(),
1989                        self.position(),
1990                    ));
1991                };
1992                match self.parse_truncate_body(model)? {
1993                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1994                    other => Err(ParseError::new(
1995                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1996                        self.position(),
1997                    )),
1998                }
1999            }
2000            Token::Explain => {
2001                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
2002                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
2003                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
2004                {
2005                    self.advance()?; // consume EXPLAIN
2006                    match self.parse_explain_migration_after_keyword()? {
2007                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
2008                        other => Err(ParseError::new(
2009                            format!(
2010                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
2011                            ),
2012                            self.position(),
2013                        )),
2014                    }
2015                } else {
2016                    match self.parse_explain_alter_query()? {
2017                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
2018                        other => Err(ParseError::new(
2019                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
2020                            self.position(),
2021                        )),
2022                    }
2023                }
2024            }
2025            Token::Create => self.parse_create_command(),
2026            Token::Drop => {
2027                let pos = self.position();
2028                self.advance()?;
2029
2030                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
2031                let materialized = self.consume(&Token::Materialized)?;
2032                if self.check(&Token::View) {
2033                    self.advance()?;
2034                    let if_exists = self.match_if_exists()?;
2035                    let name = self.expect_ident()?;
2036                    return Ok(SqlCommand::DropView(DropViewQuery {
2037                        name,
2038                        materialized,
2039                        if_exists,
2040                    }));
2041                }
2042                if materialized {
2043                    return Err(ParseError::expected(
2044                        vec!["VIEW"],
2045                        self.peek(),
2046                        self.position(),
2047                    ));
2048                }
2049
2050                if self.check(&Token::Index) {
2051                    match self.parse_drop_index_query()? {
2052                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
2053                        other => Err(ParseError::new(
2054                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
2055                            self.position(),
2056                        )),
2057                    }
2058                } else if self.check(&Token::Table) {
2059                    self.expect(Token::Table)?;
2060                    match self.parse_drop_table_body()? {
2061                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
2062                        other => Err(ParseError::new(
2063                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
2064                            self.position(),
2065                        )),
2066                    }
2067                } else if self.check(&Token::Graph) {
2068                    self.advance()?;
2069                    match self.parse_drop_graph_body()? {
2070                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
2071                        other => Err(ParseError::new(
2072                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
2073                            self.position(),
2074                        )),
2075                    }
2076                } else if self.check(&Token::Vector) {
2077                    self.advance()?;
2078                    match self.parse_drop_vector_body()? {
2079                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
2080                        other => Err(ParseError::new(
2081                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
2082                            self.position(),
2083                        )),
2084                    }
2085                } else if self.check(&Token::Document) {
2086                    self.advance()?;
2087                    match self.parse_drop_document_body()? {
2088                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
2089                        other => Err(ParseError::new(
2090                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
2091                            self.position(),
2092                        )),
2093                    }
2094                } else if self.check(&Token::Kv) {
2095                    self.advance()?;
2096                    match self.parse_drop_kv_body()? {
2097                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
2098                        other => Err(ParseError::new(
2099                            format!("internal: DROP KV produced unexpected kind {other:?}"),
2100                            self.position(),
2101                        )),
2102                    }
2103                } else if self.consume_ident_ci("CONFIG")? {
2104                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
2105                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
2106                        other => Err(ParseError::new(
2107                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
2108                            self.position(),
2109                        )),
2110                    }
2111                } else if self.consume_ident_ci("VAULT")? {
2112                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
2113                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
2114                        other => Err(ParseError::new(
2115                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
2116                            self.position(),
2117                        )),
2118                    }
2119                } else if self.check(&Token::Collection) {
2120                    self.advance()?;
2121                    match self.parse_drop_collection_body()? {
2122                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
2123                        other => Err(ParseError::new(
2124                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
2125                            self.position(),
2126                        )),
2127                    }
2128                } else if self.check(&Token::Timeseries) {
2129                    self.advance()?;
2130                    match self.parse_drop_timeseries_body()? {
2131                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
2132                        other => Err(ParseError::new(
2133                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
2134                            self.position(),
2135                        )),
2136                    }
2137                } else if self.consume_ident_ci("METRICS")? {
2138                    match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
2139                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
2140                        other => Err(ParseError::new(
2141                            format!("internal: DROP METRICS produced unexpected kind {other:?}"),
2142                            self.position(),
2143                        )),
2144                    }
2145                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
2146                {
2147                    // DROP HYPERTABLE name reuses the same AST as
2148                    // DROP TIMESERIES — runtime clears the registry
2149                    // entry *and* drops the backing collection.
2150                    self.advance()?;
2151                    match self.parse_drop_timeseries_body()? {
2152                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
2153                        other => Err(ParseError::new(
2154                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
2155                            self.position(),
2156                        )),
2157                    }
2158                } else if self.check(&Token::Queue) {
2159                    self.advance()?;
2160                    match self.parse_drop_queue_body()? {
2161                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
2162                        other => Err(ParseError::new(
2163                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
2164                            self.position(),
2165                        )),
2166                    }
2167                } else if self.check(&Token::Tree) {
2168                    self.advance()?;
2169                    match self.parse_drop_tree_body()? {
2170                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
2171                        other => Err(ParseError::new(
2172                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
2173                            self.position(),
2174                        )),
2175                    }
2176                } else if matches!(self.peek(), Token::Ident(n) if
2177                    n.eq_ignore_ascii_case("HLL") ||
2178                    n.eq_ignore_ascii_case("SKETCH") ||
2179                    n.eq_ignore_ascii_case("FILTER"))
2180                {
2181                    match self.parse_drop_probabilistic()? {
2182                        QueryExpr::ProbabilisticCommand(command) => {
2183                            Ok(SqlCommand::Probabilistic(command))
2184                        }
2185                        other => Err(ParseError::new(
2186                            format!(
2187                                "internal: DROP probabilistic produced unexpected kind {other:?}"
2188                            ),
2189                            self.position(),
2190                        )),
2191                    }
2192                } else if self.check(&Token::Schema) {
2193                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
2194                    self.advance()?;
2195                    let if_exists = self.match_if_exists()?;
2196                    let name = self.expect_ident()?;
2197                    let cascade = self.consume(&Token::Cascade)?;
2198                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
2199                        name,
2200                        if_exists,
2201                        cascade,
2202                    }))
2203                } else if self.check(&Token::Policy) {
2204                    // Two forms:
2205                    //   * IAM:   DROP POLICY '<id>'
2206                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
2207                    self.advance()?;
2208                    if matches!(self.peek(), Token::String(_)) {
2209                        let expr = self.parse_drop_iam_policy_after_keywords()?;
2210                        return Ok(SqlCommand::IamPolicy(expr));
2211                    }
2212                    let if_exists = self.match_if_exists()?;
2213                    let name = self.expect_ident()?;
2214                    self.expect(Token::On)?;
2215                    let table = self.expect_ident()?;
2216                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
2217                        name,
2218                        table,
2219                        if_exists,
2220                    }))
2221                } else if self.check(&Token::Server) {
2222                    // DROP SERVER [IF EXISTS] name [CASCADE]
2223                    self.advance()?;
2224                    let if_exists = self.match_if_exists()?;
2225                    let name = self.expect_ident()?;
2226                    let cascade = self.consume(&Token::Cascade)?;
2227                    Ok(SqlCommand::DropServer(DropServerQuery {
2228                        name,
2229                        if_exists,
2230                        cascade,
2231                    }))
2232                } else if self.check(&Token::Foreign) {
2233                    // DROP FOREIGN TABLE [IF EXISTS] name
2234                    self.advance()?;
2235                    self.expect(Token::Table)?;
2236                    let if_exists = self.match_if_exists()?;
2237                    let name = self.expect_ident()?;
2238                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
2239                        name,
2240                        if_exists,
2241                    }))
2242                } else if self.check(&Token::Sequence) {
2243                    // DROP SEQUENCE [IF EXISTS] name
2244                    self.advance()?;
2245                    let if_exists = self.match_if_exists()?;
2246                    let name = self.expect_ident()?;
2247                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
2248                        name,
2249                        if_exists,
2250                    }))
2251                } else if let Some(err) =
2252                    ParseError::unsupported_recognized_token(self.peek(), self.position())
2253                {
2254                    Err(err)
2255                } else {
2256                    Err(ParseError::expected(
2257                        vec![
2258                            "TABLE",
2259                            "INDEX",
2260                            "TIMESERIES",
2261                            "QUEUE",
2262                            "TREE",
2263                            "HLL",
2264                            "SKETCH",
2265                            "FILTER",
2266                            "SCHEMA",
2267                            "SEQUENCE",
2268                        ],
2269                        self.peek(),
2270                        pos,
2271                    ))
2272                }
2273            }
2274            Token::Alter => {
2275                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
2276                // committing to a path until we've seen the target.
2277                // We peek the *next* token (without consuming) and
2278                // dispatch accordingly.
2279                let next = self.peek_next()?.clone();
2280                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
2281                    self.advance()?; // consume ALTER
2282                    let stmt = self.parse_alter_user_statement()?;
2283                    Ok(SqlCommand::AlterUser(stmt))
2284                } else if matches!(next, Token::Queue) {
2285                    self.advance()?; // consume ALTER
2286                    self.advance()?; // consume QUEUE
2287                    match self.parse_alter_queue_body()? {
2288                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
2289                        other => Err(ParseError::new(
2290                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
2291                            self.position(),
2292                        )),
2293                    }
2294                } else if matches!(next, Token::Metric) {
2295                    self.advance()?; // consume ALTER
2296                    self.advance()?; // consume METRIC
2297                    match self.parse_alter_metric_body()? {
2298                        QueryExpr::AlterMetric(query) => Ok(SqlCommand::AlterMetric(query)),
2299                        other => Err(ParseError::new(
2300                            format!("internal: ALTER METRIC produced unexpected kind {other:?}"),
2301                            self.position(),
2302                        )),
2303                    }
2304                } else if matches!(next, Token::Graph) {
2305                    // Issue #801 — `ALTER GRAPH name ADD|DROP ANALYTICS ...`
2306                    // shares the AlterTable AST so analytics-config lifecycle
2307                    // mutations dispatch through the existing executor path.
2308                    match self.parse_alter_graph_query()? {
2309                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2310                        other => Err(ParseError::new(
2311                            format!(
2312                                "internal: ALTER GRAPH produced unexpected query kind {other:?}"
2313                            ),
2314                            self.position(),
2315                        )),
2316                    }
2317                } else if matches!(next, Token::Table)
2318                    || matches!(next, Token::Collection)
2319                    || matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("COLLECTION"))
2320                {
2321                    // Issue #522 — `ALTER COLLECTION` shares the AlterTable
2322                    // AST so signer-registry mutations dispatch through the
2323                    // existing executor. The DDL parser body accepts either
2324                    // keyword interchangeably for the open-vocabulary alters
2325                    // we own (currently `ADD|REVOKE SIGNER`).
2326                    match self.parse_alter_table_query()? {
2327                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2328                        other => Err(ParseError::new(
2329                            format!(
2330                                "internal: ALTER TABLE produced unexpected query kind {other:?}"
2331                            ),
2332                            self.position(),
2333                        )),
2334                    }
2335                } else if let Some(err) =
2336                    ParseError::unsupported_recognized_token(&next, self.position())
2337                {
2338                    Err(err)
2339                } else {
2340                    match self.parse_alter_table_query()? {
2341                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2342                        other => Err(ParseError::new(
2343                            format!("internal: ALTER produced unexpected query kind {other:?}"),
2344                            self.position(),
2345                        )),
2346                    }
2347                }
2348            }
2349            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
2350                let stmt = self.parse_grant_statement()?;
2351                Ok(SqlCommand::Grant(stmt))
2352            }
2353            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
2354                let stmt = self.parse_revoke_statement()?;
2355                Ok(SqlCommand::Revoke(stmt))
2356            }
2357            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2358                self.advance()?;
2359                if self.consume_ident_ci("BACKFILL")? {
2360                    return Err(ParseError::new(
2361                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
2362                            .to_string(),
2363                        self.position(),
2364                    ));
2365                }
2366                if !self.consume_ident_ci("STATUS")? {
2367                    return Err(ParseError::expected(
2368                        vec!["STATUS"],
2369                        self.peek(),
2370                        self.position(),
2371                    ));
2372                }
2373
2374                let mut query = TableQuery::new("red.subscriptions");
2375                let collection = match self.peek().clone() {
2376                    Token::Ident(name) => {
2377                        self.advance()?;
2378                        Some(name)
2379                    }
2380                    Token::String(name) => {
2381                        self.advance()?;
2382                        Some(name)
2383                    }
2384                    _ => None,
2385                };
2386                self.parse_table_clauses(&mut query)?;
2387                if let Some(collection) = collection {
2388                    let filter = Filter::compare(
2389                        FieldRef::column("red.subscriptions", "collection"),
2390                        CompareOp::Eq,
2391                        Value::text(collection),
2392                    );
2393                    let expr = filter_to_expr(&filter);
2394                    query.where_expr = Some(match query.where_expr.take() {
2395                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
2396                        None => expr,
2397                    });
2398                    query.filter = Some(match query.filter.take() {
2399                        Some(existing) => existing.and(filter),
2400                        None => filter,
2401                    });
2402                }
2403                Ok(SqlCommand::Select(query))
2404            }
2405            Token::Attach => {
2406                let expr = self.parse_attach_policy()?;
2407                Ok(SqlCommand::IamPolicy(expr))
2408            }
2409            Token::Detach => {
2410                let expr = self.parse_detach_policy()?;
2411                Ok(SqlCommand::IamPolicy(expr))
2412            }
2413            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
2414                let expr = self.parse_simulate_policy()?;
2415                Ok(SqlCommand::IamPolicy(expr))
2416            }
2417            Token::Ident(name) if name.eq_ignore_ascii_case("LINT") => {
2418                let expr = self.parse_lint_policy()?;
2419                Ok(SqlCommand::IamPolicy(expr))
2420            }
2421            Token::Ident(name) if name.eq_ignore_ascii_case("MIGRATE") => {
2422                // `MIGRATE POLICY MODE TO ...` is the S5B (#714) path.
2423                // Lookahead one token because `MIGRATE` is otherwise
2424                // unused at this layer.
2425                let next = self.peek_next()?.clone();
2426                let is_policy_mode = matches!(&next, Token::Policy)
2427                    || matches!(&next, Token::Ident(name)
2428                        if name.eq_ignore_ascii_case("POLICY"));
2429                if is_policy_mode {
2430                    let expr = self.parse_migrate_policy_mode()?;
2431                    return Ok(SqlCommand::IamPolicy(expr));
2432                }
2433                Err(ParseError::expected(
2434                    vec!["POLICY"],
2435                    self.peek(),
2436                    self.position(),
2437                ))
2438            }
2439            Token::Set => {
2440                self.advance()?;
2441                if self.consume_ident_ci("CONFIG")? {
2442                    let full_key = self.parse_dotted_admin_path(true)?;
2443                    self.expect(Token::Eq)?;
2444                    let value = self.parse_literal_value()?;
2445                    Ok(SqlCommand::SetConfig {
2446                        key: full_key,
2447                        value,
2448                    })
2449                } else if self.consume_ident_ci("SECRET")? {
2450                    let key = self.parse_dotted_admin_path(true)?;
2451                    self.expect(Token::Eq)?;
2452                    let value = self.parse_literal_value()?;
2453                    Ok(SqlCommand::SetSecret { key, value })
2454                } else if self.consume_ident_ci("TENANT")? {
2455                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
2456                    // SET TENANT NULL  |  SET TENANT = NULL
2457                    let _ = self.consume(&Token::Eq)?;
2458                    if self.consume_ident_ci("NULL")? {
2459                        Ok(SqlCommand::SetTenant(None))
2460                    } else {
2461                        let value = self.parse_literal_value()?;
2462                        match value {
2463                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2464                            Value::Null => Ok(SqlCommand::SetTenant(None)),
2465                            other => Err(ParseError::new(
2466                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2467                                self.position(),
2468                            )),
2469                        }
2470                    }
2471                } else {
2472                    Err(ParseError::expected(
2473                        vec!["CONFIG", "SECRET", "TENANT"],
2474                        self.peek(),
2475                        self.position(),
2476                    ))
2477                }
2478            }
2479            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2480                self.advance()?;
2481                match self.parse_apply_migration()? {
2482                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2483                    other => Err(ParseError::new(
2484                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2485                        self.position(),
2486                    )),
2487                }
2488            }
2489            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2490                // RESET TENANT — session-local clear
2491                self.advance()?;
2492                if self.consume_ident_ci("TENANT")? {
2493                    Ok(SqlCommand::SetTenant(None))
2494                } else {
2495                    Err(ParseError::expected(
2496                        vec!["TENANT"],
2497                        self.peek(),
2498                        self.position(),
2499                    ))
2500                }
2501            }
2502            Token::Ident(name)
2503                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
2504            {
2505                self.advance()?;
2506                let collection = self.parse_dotted_admin_path(false)?;
2507                let mut query = TableQuery::new("red.describe");
2508                query.filter = Some(Filter::compare(
2509                    FieldRef::column("", "collection"),
2510                    CompareOp::Eq,
2511                    Value::text(collection),
2512                ));
2513                Ok(SqlCommand::Select(query))
2514            }
2515            Token::Desc => {
2516                self.advance()?;
2517                let collection = self.parse_dotted_admin_path(false)?;
2518                let mut query = TableQuery::new("red.describe");
2519                query.filter = Some(Filter::compare(
2520                    FieldRef::column("", "collection"),
2521                    CompareOp::Eq,
2522                    Value::text(collection),
2523                ));
2524                Ok(SqlCommand::Select(query))
2525            }
2526            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2527                self.advance()?;
2528                if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
2529                    if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
2530                        return Err(ParseError::expected(
2531                            vec!["TABLE"],
2532                            self.peek(),
2533                            self.position(),
2534                        ));
2535                    }
2536                    let collection = self.parse_dotted_admin_path(false)?;
2537                    let mut query = TableQuery::new("red.show_create");
2538                    query.filter = Some(Filter::compare(
2539                        FieldRef::column("", "collection"),
2540                        CompareOp::Eq,
2541                        Value::text(collection),
2542                    ));
2543                    Ok(SqlCommand::Select(query))
2544                } else if self.consume_ident_ci("CONFIG")? {
2545                    // Accept dotted prefixes the same way SET CONFIG does
2546                    // (`SHOW CONFIG durability.mode`), and empty prefix
2547                    // (`SHOW CONFIG`) for a catalog-wide listing.
2548                    let prefix = if !self.check(&Token::Eof) {
2549                        let first = self.expect_ident()?;
2550                        let mut full = first;
2551                        while self.consume(&Token::Dot)? {
2552                            let next = self.expect_ident_or_keyword()?;
2553                            full = format!("{full}.{next}");
2554                        }
2555                        // Match SET CONFIG: lowercase so keyword segments
2556                        // come out consistent with the stored keys.
2557                        Some(full.to_ascii_lowercase())
2558                    } else {
2559                        None
2560                    };
2561                    Ok(SqlCommand::ShowConfig { prefix })
2562                } else if self.consume_ident_ci("COLLECTIONS")? {
2563                    let mut query = TableQuery::new("red.collections");
2564                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2565                        if !self.consume_ident_ci("INTERNAL")? {
2566                            return Err(ParseError::expected(
2567                                vec!["INTERNAL"],
2568                                self.peek(),
2569                                self.position(),
2570                            ));
2571                        }
2572                        true
2573                    } else {
2574                        false
2575                    };
2576                    self.parse_table_clauses(&mut query)?;
2577                    if !include_internal {
2578                        let user_filter = query.filter.take();
2579                        let hide_internal = crate::storage::query::ast::Filter::Compare {
2580                            field: FieldRef::column("", "internal"),
2581                            op: CompareOp::Eq,
2582                            value: Value::Boolean(false),
2583                        };
2584                        query.filter = Some(match user_filter {
2585                            Some(filter) => filter.and(hide_internal),
2586                            None => hide_internal,
2587                        });
2588                    }
2589                    Ok(SqlCommand::Select(query))
2590                } else if self.consume_ident_ci("TABLES")? {
2591                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2592                        self, "table",
2593                    )?))
2594                } else if self.consume_ident_ci("QUEUES")? {
2595                    // Issue #535 — `SHOW QUEUES` desugars to the
2596                    // `red.queues` virtual table (queue-shaped
2597                    // columns), not the filtered `red.collections`
2598                    // view. `INCLUDING INTERNAL` mirrors the
2599                    // `SHOW COLLECTIONS` opt-in: without it, DLQ
2600                    // targets and other auto-created queues are
2601                    // hidden via the `internal = false` filter.
2602                    let mut query = TableQuery::new("red.queues");
2603                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2604                        if !self.consume_ident_ci("INTERNAL")? {
2605                            return Err(ParseError::expected(
2606                                vec!["INTERNAL"],
2607                                self.peek(),
2608                                self.position(),
2609                            ));
2610                        }
2611                        true
2612                    } else {
2613                        false
2614                    };
2615                    self.parse_table_clauses(&mut query)?;
2616                    if !include_internal {
2617                        let hide_internal = Filter::Compare {
2618                            field: FieldRef::column("", "internal"),
2619                            op: CompareOp::Eq,
2620                            value: Value::Boolean(false),
2621                        };
2622                        add_table_filter(&mut query, hide_internal);
2623                    }
2624                    Ok(SqlCommand::Select(query))
2625                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2626                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2627                        self, "vector",
2628                    )?))
2629                } else if self.consume_ident_ci("DOCUMENTS")? {
2630                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2631                        self, "document",
2632                    )?))
2633                } else if self.consume(&Token::Timeseries)?
2634                    || self.consume_ident_ci("TIMESERIES")?
2635                {
2636                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2637                        self,
2638                        "timeseries",
2639                    )?))
2640                } else if self.consume_ident_ci("GRAPHS")? {
2641                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2642                        self, "graph",
2643                    )?))
2644                } else if self.consume_ident_ci("CONFIGS")? {
2645                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2646                        self, "config",
2647                    )?))
2648                } else if self.consume_ident_ci("VAULTS")? {
2649                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2650                        self, "vault",
2651                    )?))
2652                } else if self.consume(&Token::Kv)?
2653                    || self.consume_ident_ci("KV")?
2654                    || self.consume_ident_ci("KVS")?
2655                {
2656                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2657                        self, "kv",
2658                    )?))
2659                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2660                    let collection = self.parse_dotted_admin_path(false)?;
2661                    let mut query = TableQuery::new("red.columns");
2662                    query.filter = Some(Filter::compare(
2663                        FieldRef::column("", "collection"),
2664                        CompareOp::Eq,
2665                        Value::text(collection),
2666                    ));
2667                    Ok(SqlCommand::Select(query))
2668                } else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
2669                    let mut query = TableQuery::new("red.show_indexes");
2670                    if self.consume(&Token::On)? {
2671                        let collection = self.expect_ident_or_keyword()?;
2672                        let filter = Filter::Compare {
2673                            field: FieldRef::column("", "table"),
2674                            op: CompareOp::Eq,
2675                            value: Value::text(collection),
2676                        };
2677                        query.where_expr = Some(filter_to_expr(&filter));
2678                        query.filter = Some(filter);
2679                    }
2680                    self.parse_table_clauses(&mut query)?;
2681                    Ok(SqlCommand::Select(query))
2682                } else if self.consume_ident_ci("POLICIES")? {
2683                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2684                        let principal = self.parse_iam_principal_kind()?;
2685                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2686                            filter: Some(principal),
2687                        }));
2688                    }
2689                    let mut query = TableQuery::new("red.policies");
2690                    let collection_filter =
2691                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2692                            let collection = self.parse_dotted_admin_path(false)?;
2693                            Some(Filter::Compare {
2694                                field: FieldRef::TableColumn {
2695                                    table: String::new(),
2696                                    column: "collection".to_string(),
2697                                },
2698                                op: CompareOp::Eq,
2699                                value: Value::text(collection),
2700                            })
2701                        } else {
2702                            None
2703                        };
2704                    self.parse_table_clauses(&mut query)?;
2705                    if let Some(collection_filter) = collection_filter {
2706                        let combined = match query.filter.take() {
2707                            Some(existing) => {
2708                                Filter::And(Box::new(collection_filter), Box::new(existing))
2709                            }
2710                            None => collection_filter,
2711                        };
2712                        query.where_expr = Some(filter_to_expr(&combined));
2713                        query.filter = Some(combined);
2714                    }
2715                    Ok(SqlCommand::Select(query))
2716                } else if self.consume_ident_ci("STATS")? {
2717                    let mut query = TableQuery::new("red.stats");
2718                    let collection = match self.peek().clone() {
2719                        Token::Ident(name) => {
2720                            self.advance()?;
2721                            Some(name)
2722                        }
2723                        Token::String(name) => {
2724                            self.advance()?;
2725                            Some(name)
2726                        }
2727                        _ => None,
2728                    };
2729                    self.parse_table_clauses(&mut query)?;
2730                    if let Some(collection) = collection {
2731                        let filter = Filter::compare(
2732                            FieldRef::column("red.stats", "collection"),
2733                            CompareOp::Eq,
2734                            Value::text(collection),
2735                        );
2736                        let expr = filter_to_expr(&filter);
2737                        query.where_expr = Some(match query.where_expr.take() {
2738                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
2739                            None => expr,
2740                        });
2741                        query.filter = Some(match query.filter.take() {
2742                            Some(existing) => existing.and(filter),
2743                            None => filter,
2744                        });
2745                    }
2746                    Ok(SqlCommand::Select(query))
2747                } else if self.consume_ident_ci("SAMPLE")? {
2748                    let mut query = TableQuery::new(&self.expect_ident()?);
2749                    query.limit = if self.consume(&Token::Limit)? {
2750                        Some(self.parse_integer()? as u64)
2751                    } else {
2752                        Some(10)
2753                    };
2754                    Ok(SqlCommand::Select(query))
2755                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2756                    let prefix = if !self.check(&Token::Eof) {
2757                        Some(self.parse_dotted_admin_path(true)?)
2758                    } else {
2759                        None
2760                    };
2761                    Ok(SqlCommand::ShowSecrets { prefix })
2762                } else if self.consume_ident_ci("TENANT")? {
2763                    Ok(SqlCommand::ShowTenant)
2764                } else if let Some(expr) = self.parse_show_iam_after_show()? {
2765                    Ok(SqlCommand::IamPolicy(expr))
2766                } else {
2767                    Err(ParseError::expected(
2768                        vec![
2769                            "CONFIG",
2770                            "SECRET",
2771                            "SECRETS",
2772                            "COLLECTIONS",
2773                            "TABLES",
2774                            "QUEUES",
2775                            "VECTORS",
2776                            "DOCUMENTS",
2777                            "TIMESERIES",
2778                            "GRAPHS",
2779                            "KV",
2780                            "SCHEMA",
2781                            "INDICES",
2782                            "INDEXES",
2783                            "SAMPLE",
2784                            "POLICIES",
2785                            "STATS",
2786                            "TENANT",
2787                            "EFFECTIVE",
2788                        ],
2789                        self.peek(),
2790                        self.position(),
2791                    ))
2792                }
2793            }
2794            // Transaction control statements (Phase 1.1 PG parity).
2795            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
2796            // START TRANSACTION [ISOLATION LEVEL <mode>]
2797            //
2798            // We only implement SNAPSHOT ISOLATION (our default). We
2799            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2800            // READ / SNAPSHOT as PG-compatible no-ops, but reject
2801            // SERIALIZABLE outright — the previous behaviour of
2802            // silently degrading to snapshot made the parser
2803            // dishonest. Real SSI (Serializable Snapshot Isolation)
2804            // is tracked as a future milestone.
2805            Token::Begin | Token::Start => {
2806                self.advance()?;
2807                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2808                // Optional ISOLATION LEVEL clause.
2809                if self.consume_ident_ci("ISOLATION")? {
2810                    self.expect(Token::Level)?;
2811                    // The level identifier can span multiple words
2812                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2813                    // READ). Collect them case-insensitively.
2814                    let mut parts: Vec<String> = Vec::new();
2815                    if self.consume_ident_ci("READ")? {
2816                        parts.push("READ".to_string());
2817                        if self.consume_ident_ci("UNCOMMITTED")? {
2818                            parts.push("UNCOMMITTED".to_string());
2819                        } else if self.consume_ident_ci("COMMITTED")? {
2820                            parts.push("COMMITTED".to_string());
2821                        } else {
2822                            return Err(ParseError::expected(
2823                                vec!["UNCOMMITTED", "COMMITTED"],
2824                                self.peek(),
2825                                self.position(),
2826                            ));
2827                        }
2828                    } else if self.consume_ident_ci("REPEATABLE")? {
2829                        parts.push("REPEATABLE".to_string());
2830                        if !self.consume_ident_ci("READ")? {
2831                            return Err(ParseError::expected(
2832                                vec!["READ"],
2833                                self.peek(),
2834                                self.position(),
2835                            ));
2836                        }
2837                        parts.push("READ".to_string());
2838                    } else if self.consume_ident_ci("SNAPSHOT")? {
2839                        parts.push("SNAPSHOT".to_string());
2840                    } else if self.consume_ident_ci("SERIALIZABLE")? {
2841                        return Err(ParseError::new(
2842                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2843                             currently provides SNAPSHOT ISOLATION (which PG calls \
2844                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2845                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
2846                                .to_string(),
2847                            self.position(),
2848                        ));
2849                    } else {
2850                        return Err(ParseError::expected(
2851                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2852                            self.peek(),
2853                            self.position(),
2854                        ));
2855                    }
2856                    // All accepted modes map to our snapshot engine today.
2857                    let _ = parts;
2858                }
2859                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2860            }
2861            // COMMIT [WORK | TRANSACTION]
2862            Token::Commit => {
2863                self.advance()?;
2864                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2865                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2866            }
2867            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
2868            // ROLLBACK MIGRATION name
2869            Token::Rollback => {
2870                self.advance()?;
2871                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2872                    match self.parse_rollback_migration_after_keyword()? {
2873                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2874                        other => Err(ParseError::new(
2875                            format!(
2876                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2877                            ),
2878                            self.position(),
2879                        )),
2880                    }
2881                } else {
2882                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2883                    if self.consume(&Token::To)? {
2884                        let _ = self.consume(&Token::Savepoint)?;
2885                        let name = self.expect_ident()?;
2886                        Ok(SqlCommand::TransactionControl(
2887                            TxnControl::RollbackToSavepoint(name),
2888                        ))
2889                    } else {
2890                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2891                    }
2892                }
2893            }
2894            // SAVEPOINT name
2895            Token::Savepoint => {
2896                self.advance()?;
2897                let name = self.expect_ident()?;
2898                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2899            }
2900            // RELEASE [SAVEPOINT] name
2901            Token::Release => {
2902                self.advance()?;
2903                let _ = self.consume(&Token::Savepoint)?;
2904                let name = self.expect_ident()?;
2905                Ok(SqlCommand::TransactionControl(
2906                    TxnControl::ReleaseSavepoint(name),
2907                ))
2908            }
2909            // VACUUM [FULL] [table]
2910            Token::Vacuum => {
2911                self.advance()?;
2912                let full = self.consume(&Token::Full)?;
2913                let target = if self.check(&Token::Eof) {
2914                    None
2915                } else {
2916                    Some(self.expect_ident()?)
2917                };
2918                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2919                    target,
2920                    full,
2921                }))
2922            }
2923            // REFRESH MATERIALIZED VIEW name
2924            Token::Refresh => {
2925                self.advance()?;
2926                self.expect(Token::Materialized)?;
2927                self.expect(Token::View)?;
2928                let name = self.expect_ident()?;
2929                Ok(SqlCommand::RefreshMaterializedView(
2930                    RefreshMaterializedViewQuery { name },
2931                ))
2932            }
2933            // ANALYZE [table]
2934            Token::Analyze => {
2935                self.advance()?;
2936                let target = if self.check(&Token::Eof) {
2937                    None
2938                } else {
2939                    Some(self.expect_ident()?)
2940                };
2941                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2942                    target,
2943                }))
2944            }
2945            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
2946            //
2947            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
2948            // short-form `DELIMITER ',' HEADER`. The only supported format
2949            // today is CSV.
2950            Token::Copy => {
2951                self.advance()?;
2952                let table = self.expect_ident()?;
2953                self.expect(Token::From)?;
2954                let path = self.parse_string()?;
2955
2956                let mut delimiter: Option<char> = None;
2957                let mut has_header = false;
2958                let format = CopyFormat::Csv;
2959
2960                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
2961                // `WITH` is a reserved keyword token — accept both the keyword
2962                // form and the ident form that non-CTE callers sometimes emit.
2963                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2964                    self.expect(Token::LParen)?;
2965                    loop {
2966                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2967                            let _ = self.consume(&Token::Eq)?;
2968                            // Only CSV for now — accept the ident and move on.
2969                            let _ = self.expect_ident()?;
2970                        } else if self.consume(&Token::Header)? {
2971                            let _ = self.consume(&Token::Eq)?;
2972                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
2973                            // or an ident spelling of true/false.
2974                            has_header = match self.peek().clone() {
2975                                Token::True => {
2976                                    self.advance()?;
2977                                    true
2978                                }
2979                                Token::False => {
2980                                    self.advance()?;
2981                                    false
2982                                }
2983                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2984                                    self.advance()?;
2985                                    true
2986                                }
2987                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2988                                    self.advance()?;
2989                                    false
2990                                }
2991                                _ => true,
2992                            };
2993                        } else if self.consume(&Token::Delimiter)? {
2994                            let _ = self.consume(&Token::Eq)?;
2995                            let s = self.parse_string()?;
2996                            delimiter = s.chars().next();
2997                        } else {
2998                            break;
2999                        }
3000                        if !self.consume(&Token::Comma)? {
3001                            break;
3002                        }
3003                    }
3004                    self.expect(Token::RParen)?;
3005                }
3006
3007                // Short form clauses outside WITH (in either order).
3008                loop {
3009                    if self.consume(&Token::Delimiter)? {
3010                        let s = self.parse_string()?;
3011                        delimiter = s.chars().next();
3012                    } else if self.consume(&Token::Header)? {
3013                        has_header = true;
3014                    } else {
3015                        break;
3016                    }
3017                }
3018
3019                Ok(SqlCommand::CopyFrom(CopyFromQuery {
3020                    table,
3021                    path,
3022                    format,
3023                    delimiter,
3024                    has_header,
3025                }))
3026            }
3027            other => Err(ParseError::expected(
3028                vec![
3029                    "SELECT",
3030                    "FROM",
3031                    "INSERT",
3032                    "UPDATE",
3033                    "DELETE",
3034                    "EXPLAIN",
3035                    "CREATE",
3036                    "DROP",
3037                    "ALTER",
3038                    "SET",
3039                    "SHOW",
3040                    "BEGIN",
3041                    "COMMIT",
3042                    "ROLLBACK",
3043                    "SAVEPOINT",
3044                    "RELEASE",
3045                    "START",
3046                    "VACUUM",
3047                    "ANALYZE",
3048                    "COPY",
3049                    "REFRESH",
3050                    "DESCRIBE",
3051                    "DESC",
3052                ],
3053                other,
3054                self.position(),
3055            )),
3056        }
3057    }
3058}