Skip to main content

reddb_server/storage/query/
sql.rs

1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3    AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery, AskQuery, BinOp,
4    CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateForeignTableQuery, CreateIndexQuery,
5    CreateMigrationQuery, CreatePolicyQuery, CreateQueueQuery, CreateSchemaQuery,
6    CreateSequenceQuery, CreateServerQuery, CreateTableQuery, CreateTimeSeriesQuery,
7    CreateTreeQuery, CreateViewQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery,
8    DropForeignTableQuery, DropGraphQuery, DropIndexQuery, DropKvQuery, DropPolicyQuery,
9    DropQueueQuery, DropSchemaQuery, DropSequenceQuery, DropServerQuery, DropTableQuery,
10    DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery, DropViewQuery, EventsBackfillQuery,
11    ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef, Filter, ForeignColumnDef, GrantStmt,
12    GraphCommand, GraphQuery, HybridQuery, InsertQuery, JoinQuery, KvCommand, MaintenanceCommand,
13    PathQuery, PolicyAction, ProbabilisticCommand, QueryExpr, QueueCommand, QueueSelectQuery,
14    RefreshMaterializedViewQuery, RevokeStmt, RollbackMigrationQuery, SearchCommand, Span,
15    TableQuery, TreeCommand, TruncateQuery, TxnControl, UpdateQuery, VectorQuery,
16};
17use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
18use crate::storage::query::sql_lowering::filter_to_expr;
19use crate::storage::query::Token;
20use crate::storage::schema::Value;
21
22/// Canonical SQL frontend command surface.
23///
24/// This is the single entrypoint for SQL/RQL-style commands before they are
25/// lowered into the broader multi-backend `QueryExpr` space.
26#[derive(Debug, Clone)]
27pub enum SqlStatement {
28    Query(SqlQuery),
29    Mutation(SqlMutation),
30    Schema(SqlSchemaCommand),
31    Admin(SqlAdminCommand),
32}
33
34#[derive(Debug, Clone)]
35#[allow(clippy::large_enum_variant)]
36pub enum FrontendStatement {
37    Sql(SqlStatement),
38    Graph(GraphQuery),
39    GraphCommand(GraphCommand),
40    Path(PathQuery),
41    Vector(VectorQuery),
42    Hybrid(HybridQuery),
43    Search(SearchCommand),
44    Ask(AskQuery),
45    QueueSelect(QueueSelectQuery),
46    QueueCommand(QueueCommand),
47    EventsBackfill(EventsBackfillQuery),
48    EventsBackfillStatus { collection: String },
49    TreeCommand(TreeCommand),
50    ProbabilisticCommand(ProbabilisticCommand),
51    KvCommand(KvCommand),
52    ConfigCommand(ConfigCommand),
53}
54
55#[derive(Debug, Clone)]
56pub enum SqlCommand {
57    Select(TableQuery),
58    Join(JoinQuery),
59    Insert(InsertQuery),
60    Update(UpdateQuery),
61    Delete(DeleteQuery),
62    ExplainAlter(ExplainAlterQuery),
63    CreateTable(CreateTableQuery),
64    DropTable(DropTableQuery),
65    DropGraph(DropGraphQuery),
66    DropVector(DropVectorQuery),
67    DropDocument(DropDocumentQuery),
68    DropKv(DropKvQuery),
69    DropCollection(DropCollectionQuery),
70    Truncate(TruncateQuery),
71    AlterTable(AlterTableQuery),
72    CreateIndex(CreateIndexQuery),
73    DropIndex(DropIndexQuery),
74    CreateTimeSeries(CreateTimeSeriesQuery),
75    DropTimeSeries(DropTimeSeriesQuery),
76    CreateQueue(CreateQueueQuery),
77    AlterQueue(AlterQueueQuery),
78    DropQueue(DropQueueQuery),
79    CreateTree(CreateTreeQuery),
80    DropTree(DropTreeQuery),
81    Probabilistic(ProbabilisticCommand),
82    SetConfig {
83        key: String,
84        value: Value,
85    },
86    ShowConfig {
87        prefix: Option<String>,
88    },
89    SetSecret {
90        key: String,
91        value: Value,
92    },
93    DeleteSecret {
94        key: String,
95    },
96    ShowSecrets {
97        prefix: Option<String>,
98    },
99    SetTenant(Option<String>),
100    ShowTenant,
101    TransactionControl(TxnControl),
102    Maintenance(MaintenanceCommand),
103    CreateSchema(CreateSchemaQuery),
104    DropSchema(DropSchemaQuery),
105    CreateSequence(CreateSequenceQuery),
106    DropSequence(DropSequenceQuery),
107    CopyFrom(CopyFromQuery),
108    CreateView(CreateViewQuery),
109    DropView(DropViewQuery),
110    RefreshMaterializedView(RefreshMaterializedViewQuery),
111    CreatePolicy(CreatePolicyQuery),
112    DropPolicy(DropPolicyQuery),
113    CreateServer(CreateServerQuery),
114    DropServer(DropServerQuery),
115    CreateForeignTable(CreateForeignTableQuery),
116    DropForeignTable(DropForeignTableQuery),
117    /// `GRANT … ON … TO …`
118    Grant(GrantStmt),
119    /// `REVOKE … ON … FROM …`
120    Revoke(RevokeStmt),
121    /// `ALTER USER name <attrs>`
122    AlterUser(AlterUserStmt),
123    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
124    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
125    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
126    /// can route the multitude of shapes through a single arm.
127    IamPolicy(QueryExpr),
128    CreateMigration(CreateMigrationQuery),
129    ApplyMigration(ApplyMigrationQuery),
130    RollbackMigration(RollbackMigrationQuery),
131    ExplainMigration(ExplainMigrationQuery),
132}
133
134fn collection_model_filter(model: &str) -> Filter {
135    Filter::Compare {
136        field: FieldRef::column("", "model"),
137        op: CompareOp::Eq,
138        value: Value::Text(model.to_string().into()),
139    }
140}
141
142fn add_table_filter(query: &mut TableQuery, filter: Filter) {
143    let combined = match query.filter.take() {
144        Some(existing) => existing.and(filter),
145        None => filter,
146    };
147    query.where_expr = Some(filter_to_expr(&combined));
148    query.filter = Some(combined);
149}
150
151fn parse_show_collections_by_model(
152    parser: &mut Parser<'_>,
153    model: &str,
154) -> Result<TableQuery, ParseError> {
155    let mut query = TableQuery::new("red.collections");
156    parser.parse_table_clauses(&mut query)?;
157    add_table_filter(&mut query, collection_model_filter(model));
158    Ok(query)
159}
160
161#[derive(Debug, Clone)]
162#[allow(clippy::large_enum_variant)]
163pub enum SqlQuery {
164    Select(TableQuery),
165    Join(JoinQuery),
166}
167
168#[derive(Debug, Clone)]
169pub enum SqlMutation {
170    Insert(InsertQuery),
171    Update(UpdateQuery),
172    Delete(DeleteQuery),
173}
174
175#[derive(Debug, Clone)]
176pub enum SqlSchemaCommand {
177    ExplainAlter(ExplainAlterQuery),
178    CreateTable(CreateTableQuery),
179    DropTable(DropTableQuery),
180    DropGraph(DropGraphQuery),
181    DropVector(DropVectorQuery),
182    DropDocument(DropDocumentQuery),
183    DropKv(DropKvQuery),
184    DropCollection(DropCollectionQuery),
185    Truncate(TruncateQuery),
186    AlterTable(AlterTableQuery),
187    CreateIndex(CreateIndexQuery),
188    DropIndex(DropIndexQuery),
189    CreateTimeSeries(CreateTimeSeriesQuery),
190    DropTimeSeries(DropTimeSeriesQuery),
191    CreateQueue(CreateQueueQuery),
192    AlterQueue(AlterQueueQuery),
193    DropQueue(DropQueueQuery),
194    CreateTree(CreateTreeQuery),
195    DropTree(DropTreeQuery),
196    Probabilistic(ProbabilisticCommand),
197    CreateSchema(CreateSchemaQuery),
198    DropSchema(DropSchemaQuery),
199    CreateSequence(CreateSequenceQuery),
200    DropSequence(DropSequenceQuery),
201    CopyFrom(CopyFromQuery),
202    CreateView(CreateViewQuery),
203    DropView(DropViewQuery),
204    RefreshMaterializedView(RefreshMaterializedViewQuery),
205    CreatePolicy(CreatePolicyQuery),
206    DropPolicy(DropPolicyQuery),
207    CreateServer(CreateServerQuery),
208    DropServer(DropServerQuery),
209    CreateForeignTable(CreateForeignTableQuery),
210    DropForeignTable(DropForeignTableQuery),
211    CreateMigration(CreateMigrationQuery),
212    ApplyMigration(ApplyMigrationQuery),
213    RollbackMigration(RollbackMigrationQuery),
214    ExplainMigration(ExplainMigrationQuery),
215}
216
217#[derive(Debug, Clone)]
218#[allow(clippy::large_enum_variant)]
219pub enum SqlAdminCommand {
220    SetConfig { key: String, value: Value },
221    ShowConfig { prefix: Option<String> },
222    SetSecret { key: String, value: Value },
223    DeleteSecret { key: String },
224    ShowSecrets { prefix: Option<String> },
225    SetTenant(Option<String>),
226    ShowTenant,
227    TransactionControl(TxnControl),
228    Maintenance(MaintenanceCommand),
229    Grant(GrantStmt),
230    Revoke(RevokeStmt),
231    AlterUser(AlterUserStmt),
232    IamPolicy(QueryExpr),
233}
234
235impl SqlStatement {
236    pub fn into_command(self) -> SqlCommand {
237        match self {
238            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
239            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
240            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
241            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
242            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
243            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
244                SqlCommand::ExplainAlter(query)
245            }
246            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
247                SqlCommand::CreateTable(query)
248            }
249            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
250                SqlCommand::DropTable(query)
251            }
252            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
253                SqlCommand::DropGraph(query)
254            }
255            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
256                SqlCommand::DropVector(query)
257            }
258            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
259                SqlCommand::DropDocument(query)
260            }
261            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
262            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
263                SqlCommand::DropCollection(query)
264            }
265            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
266            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
267                SqlCommand::AlterTable(query)
268            }
269            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
270                SqlCommand::CreateIndex(query)
271            }
272            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
273                SqlCommand::DropIndex(query)
274            }
275            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
276                SqlCommand::CreateTimeSeries(query)
277            }
278            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
279                SqlCommand::DropTimeSeries(query)
280            }
281            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
282                SqlCommand::CreateQueue(query)
283            }
284            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
285                SqlCommand::AlterQueue(query)
286            }
287            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
288                SqlCommand::DropQueue(query)
289            }
290            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
291                SqlCommand::CreateTree(query)
292            }
293            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
294            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
295                SqlCommand::Probabilistic(command)
296            }
297            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
298                SqlCommand::SetConfig { key, value }
299            }
300            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
301                SqlCommand::ShowConfig { prefix }
302            }
303            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
304                SqlCommand::SetSecret { key, value }
305            }
306            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
307                SqlCommand::DeleteSecret { key }
308            }
309            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
310                SqlCommand::ShowSecrets { prefix }
311            }
312            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
313            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
314            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
315                SqlCommand::TransactionControl(ctl)
316            }
317            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
318            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
319            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
320            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
321                SqlCommand::CreateSequence(q)
322            }
323            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
324            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
325            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
326            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
327            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
328                SqlCommand::RefreshMaterializedView(q)
329            }
330            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
331            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
332            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
333            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
334            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
335                SqlCommand::CreateForeignTable(q)
336            }
337            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
338                SqlCommand::DropForeignTable(q)
339            }
340            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
341            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
342            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
343            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
344            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
345                SqlCommand::CreateMigration(q)
346            }
347            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
348                SqlCommand::ApplyMigration(q)
349            }
350            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
351                SqlCommand::RollbackMigration(q)
352            }
353            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
354                SqlCommand::ExplainMigration(q)
355            }
356        }
357    }
358
359    pub fn into_query_expr(self) -> QueryExpr {
360        self.into_command().into_query_expr()
361    }
362}
363
364impl FrontendStatement {
365    pub fn into_query_expr(self) -> QueryExpr {
366        match self {
367            FrontendStatement::Sql(statement) => statement.into_query_expr(),
368            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
369            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
370            FrontendStatement::Path(query) => QueryExpr::Path(query),
371            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
372            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
373            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
374            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
375            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
376            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
377            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
378            FrontendStatement::EventsBackfillStatus { collection } => {
379                QueryExpr::EventsBackfillStatus { collection }
380            }
381            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
382            FrontendStatement::ProbabilisticCommand(command) => {
383                QueryExpr::ProbabilisticCommand(command)
384            }
385            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
386            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
387        }
388    }
389}
390
391pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
392    let mut parser = Parser::new(input)?;
393    let statement = parser.parse_frontend_statement()?;
394    if !parser.check(&Token::Eof) {
395        return Err(ParseError::new(
396            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
397            // Display arms emit raw user bytes. Render via `{:?}` so
398            // embedded CR/LF/NUL/quotes are escaped before the message
399            // reaches downstream JSON / audit / log / gRPC sinks.
400            format!("Unexpected token after query: {:?}", parser.current.token),
401            parser.position(),
402        ));
403    }
404    Ok(statement)
405}
406
407impl SqlCommand {
408    pub fn into_query_expr(self) -> QueryExpr {
409        match self {
410            SqlCommand::Select(query) => QueryExpr::Table(query),
411            SqlCommand::Join(query) => QueryExpr::Join(query),
412            SqlCommand::Insert(query) => QueryExpr::Insert(query),
413            SqlCommand::Update(query) => QueryExpr::Update(query),
414            SqlCommand::Delete(query) => QueryExpr::Delete(query),
415            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
416            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
417            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
418            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
419            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
420            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
421            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
422            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
423            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
424            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
425            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
426            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
427            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
428            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
429            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
430            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
431            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
432            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
433            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
434            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
435            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
436            SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
437            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
438            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
439            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
440            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
441            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
442            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
443            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
444            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
445            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
446            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
447            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
448            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
449            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
450            SqlCommand::DropView(q) => QueryExpr::DropView(q),
451            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
452            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
453            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
454            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
455            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
456            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
457            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
458            SqlCommand::Grant(s) => QueryExpr::Grant(s),
459            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
460            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
461            SqlCommand::IamPolicy(e) => e,
462            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
463            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
464            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
465            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
466        }
467    }
468
469    pub fn into_statement(self) -> SqlStatement {
470        match self {
471            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
472            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
473            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
474            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
475            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
476            SqlCommand::ExplainAlter(query) => {
477                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
478            }
479            SqlCommand::CreateTable(query) => {
480                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
481            }
482            SqlCommand::DropTable(query) => {
483                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
484            }
485            SqlCommand::DropGraph(query) => {
486                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
487            }
488            SqlCommand::DropVector(query) => {
489                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
490            }
491            SqlCommand::DropDocument(query) => {
492                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
493            }
494            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
495            SqlCommand::DropCollection(query) => {
496                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
497            }
498            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
499            SqlCommand::AlterTable(query) => {
500                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
501            }
502            SqlCommand::CreateIndex(query) => {
503                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
504            }
505            SqlCommand::DropIndex(query) => {
506                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
507            }
508            SqlCommand::CreateTimeSeries(query) => {
509                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
510            }
511            SqlCommand::DropTimeSeries(query) => {
512                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
513            }
514            SqlCommand::CreateQueue(query) => {
515                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
516            }
517            SqlCommand::AlterQueue(query) => {
518                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
519            }
520            SqlCommand::DropQueue(query) => {
521                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
522            }
523            SqlCommand::CreateTree(query) => {
524                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
525            }
526            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
527            SqlCommand::Probabilistic(command) => {
528                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
529            }
530            SqlCommand::SetConfig { key, value } => {
531                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
532            }
533            SqlCommand::ShowConfig { prefix } => {
534                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
535            }
536            SqlCommand::SetSecret { key, value } => {
537                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
538            }
539            SqlCommand::DeleteSecret { key } => {
540                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
541            }
542            SqlCommand::ShowSecrets { prefix } => {
543                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
544            }
545            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
546            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
547            SqlCommand::TransactionControl(ctl) => {
548                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
549            }
550            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
551            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
552            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
553            SqlCommand::CreateSequence(q) => {
554                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
555            }
556            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
557            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
558            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
559            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
560            SqlCommand::RefreshMaterializedView(q) => {
561                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
562            }
563            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
564            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
565            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
566            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
567            SqlCommand::CreateForeignTable(q) => {
568                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
569            }
570            SqlCommand::DropForeignTable(q) => {
571                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
572            }
573            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
574            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
575            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
576            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
577            SqlCommand::CreateMigration(q) => {
578                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
579            }
580            SqlCommand::ApplyMigration(q) => {
581                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
582            }
583            SqlCommand::RollbackMigration(q) => {
584                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
585            }
586            SqlCommand::ExplainMigration(q) => {
587                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
588            }
589        }
590    }
591}
592
593impl<'a> Parser<'a> {
594    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
595        self.expect_ident()?; // EVENTS
596        if self.consume_ident_ci("STATUS")? {
597            let mut query = TableQuery::new("red.subscriptions");
598            let collection = match self.peek().clone() {
599                Token::Ident(name) => {
600                    self.advance()?;
601                    Some(name)
602                }
603                Token::String(name) => {
604                    self.advance()?;
605                    Some(name)
606                }
607                _ => None,
608            };
609            self.parse_table_clauses(&mut query)?;
610            if let Some(collection) = collection {
611                let filter = Filter::compare(
612                    FieldRef::column("red.subscriptions", "collection"),
613                    CompareOp::Eq,
614                    Value::text(collection),
615                );
616                let expr = filter_to_expr(&filter);
617                query.where_expr = Some(match query.where_expr.take() {
618                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
619                    None => expr,
620                });
621                query.filter = Some(match query.filter.take() {
622                    Some(existing) => existing.and(filter),
623                    None => filter,
624                });
625            }
626            return Ok(QueryExpr::Table(query));
627        }
628
629        if !self.consume_ident_ci("BACKFILL")? {
630            return Err(ParseError::expected(
631                vec!["BACKFILL", "STATUS"],
632                self.peek(),
633                self.position(),
634            ));
635        }
636
637        if self.consume_ident_ci("STATUS")? {
638            let collection = self.expect_ident()?;
639            return Ok(QueryExpr::EventsBackfillStatus { collection });
640        }
641
642        let collection = self.expect_ident()?;
643        let where_filter = if self.consume(&Token::Where)? {
644            let mut parts = Vec::new();
645            while !self.check(&Token::Eof) && !self.check(&Token::To) {
646                parts.push(self.peek().to_string());
647                self.advance()?;
648            }
649            if parts.is_empty() {
650                return Err(ParseError::expected(
651                    vec!["predicate"],
652                    self.peek(),
653                    self.position(),
654                ));
655            }
656            Some(parts.join(" "))
657        } else {
658            None
659        };
660
661        self.expect(Token::To)?;
662        let target_queue = self.expect_ident()?;
663        let limit = if self.consume(&Token::Limit)? {
664            Some(self.parse_positive_integer("LIMIT")? as u64)
665        } else {
666            None
667        };
668
669        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
670            collection,
671            where_filter,
672            target_queue,
673            limit,
674        }))
675    }
676
677    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
678    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
679    /// clause is absent. Values are always single-quoted string literals —
680    /// consistent with PG's generic-options model.
681    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
682        if !self.consume(&Token::Options)? {
683            return Ok(Vec::new());
684        }
685        self.expect(Token::LParen)?;
686        let mut out: Vec<(String, String)> = Vec::new();
687        loop {
688            // Option keys frequently collide with reserved words
689            // (`path`, `format`, `delimiter`, `header`, …) — accept
690            // the keyword form and lowercase it so downstream
691            // option-name matching stays case-insensitive.
692            let was_ident = matches!(self.peek(), Token::Ident(_));
693            let raw = self.expect_ident_or_keyword()?;
694            let key = if was_ident {
695                raw
696            } else {
697                raw.to_ascii_lowercase()
698            };
699            // Value is a single-quoted string literal.
700            let value = self.parse_string()?;
701            out.push((key, value));
702            if !self.consume(&Token::Comma)? {
703                break;
704            }
705        }
706        self.expect(Token::RParen)?;
707        Ok(out)
708    }
709
710    /// Parse any top-level frontend statement through a single shared surface.
711    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
712        match self.peek() {
713            Token::Select => match self.parse_select_query()? {
714                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
715                    SqlQuery::Select(query),
716                ))),
717                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
718                other => Err(ParseError::new(
719                    format!("internal: SELECT produced unexpected query kind {other:?}"),
720                    self.position(),
721                )),
722            },
723            Token::From
724            | Token::Insert
725            | Token::Update
726            | Token::Truncate
727            | Token::Explain
728            | Token::Create
729            | Token::Drop
730            | Token::Alter
731            | Token::Set
732            | Token::Begin
733            | Token::Commit
734            | Token::Rollback
735            | Token::Savepoint
736            | Token::Release
737            | Token::Start
738            | Token::Vacuum
739            | Token::Analyze
740            | Token::Copy
741            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
742            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
743                self.parse_sql_statement().map(FrontendStatement::Sql)
744            }
745            Token::Ident(name)
746                if name.eq_ignore_ascii_case("GRANT")
747                    || name.eq_ignore_ascii_case("REVOKE")
748                    || name.eq_ignore_ascii_case("SIMULATE")
749                    || name.eq_ignore_ascii_case("APPLY") =>
750            {
751                self.parse_sql_statement().map(FrontendStatement::Sql)
752            }
753            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
754                self.advance()?;
755                if matches!(
756                    self.peek(),
757                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
758                ) {
759                    match self.parse_config_watch_after_watch()? {
760                        QueryExpr::ConfigCommand(command) => {
761                            Ok(FrontendStatement::ConfigCommand(command))
762                        }
763                        other => Err(ParseError::new(
764                            format!(
765                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
766                            ),
767                            self.position(),
768                        )),
769                    }
770                } else if matches!(
771                    self.peek(),
772                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
773                ) {
774                    match self.parse_vault_watch_after_watch()? {
775                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
776                        other => Err(ParseError::new(
777                            format!(
778                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
779                            ),
780                            self.position(),
781                        )),
782                    }
783                } else {
784                    match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
785                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
786                        other => Err(ParseError::new(
787                            format!("internal: WATCH produced unexpected query kind {other:?}"),
788                            self.position(),
789                        )),
790                    }
791                }
792            }
793            Token::List => {
794                self.advance()?;
795                if matches!(
796                    self.peek(),
797                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
798                ) {
799                    match self.parse_config_list_after_list()? {
800                        QueryExpr::ConfigCommand(command) => {
801                            Ok(FrontendStatement::ConfigCommand(command))
802                        }
803                        other => Err(ParseError::new(
804                            format!(
805                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
806                            ),
807                            self.position(),
808                        )),
809                    }
810                } else if matches!(
811                    self.peek(),
812                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
813                ) {
814                    match self.parse_vault_list_after_list()? {
815                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
816                        other => Err(ParseError::new(
817                            format!(
818                                "internal: LIST VAULT produced unexpected query kind {other:?}"
819                            ),
820                            self.position(),
821                        )),
822                    }
823                } else {
824                    Err(ParseError::expected(
825                        vec!["CONFIG", "VAULT"],
826                        self.peek(),
827                        self.position(),
828                    ))
829                }
830            }
831            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
832                self.advance()?;
833                if matches!(
834                    self.peek(),
835                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
836                ) {
837                    match self.parse_config_list_after_list()? {
838                        QueryExpr::ConfigCommand(command) => {
839                            Ok(FrontendStatement::ConfigCommand(command))
840                        }
841                        other => Err(ParseError::new(
842                            format!(
843                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
844                            ),
845                            self.position(),
846                        )),
847                    }
848                } else if matches!(
849                    self.peek(),
850                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
851                ) {
852                    match self.parse_vault_list_after_list()? {
853                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
854                        other => Err(ParseError::new(
855                            format!(
856                                "internal: LIST VAULT produced unexpected query kind {other:?}"
857                            ),
858                            self.position(),
859                        )),
860                    }
861                } else {
862                    Err(ParseError::expected(
863                        vec!["CONFIG", "VAULT"],
864                        self.peek(),
865                        self.position(),
866                    ))
867                }
868            }
869            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
870                if matches!(
871                    self.peek_next()?,
872                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
873                ) {
874                    match self.parse_config_command()? {
875                        QueryExpr::ConfigCommand(command) => {
876                            Ok(FrontendStatement::ConfigCommand(command))
877                        }
878                        other => Err(ParseError::new(
879                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
880                            self.position(),
881                        )),
882                    }
883                } else {
884                    self.advance()?;
885                    match self.parse_kv_invalidate_tags_after_invalidate()? {
886                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
887                        other => Err(ParseError::new(
888                            format!(
889                                "internal: INVALIDATE produced unexpected query kind {other:?}"
890                            ),
891                            self.position(),
892                        )),
893                    }
894                }
895            }
896            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
897            Token::Match => match self.parse_match_query()? {
898                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
899                other => Err(ParseError::new(
900                    format!("internal: MATCH produced unexpected query kind {other:?}"),
901                    self.position(),
902                )),
903            },
904            Token::Path => match self.parse_path_query()? {
905                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
906                other => Err(ParseError::new(
907                    format!("internal: PATH produced unexpected query kind {other:?}"),
908                    self.position(),
909                )),
910            },
911            Token::Vector => match self.parse_vector_query()? {
912                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
913                other => Err(ParseError::new(
914                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
915                    self.position(),
916                )),
917            },
918            Token::Hybrid => match self.parse_hybrid_query()? {
919                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
920                other => Err(ParseError::new(
921                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
922                    self.position(),
923                )),
924            },
925            Token::Graph => match self.parse_graph_command()? {
926                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
927                other => Err(ParseError::new(
928                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
929                    self.position(),
930                )),
931            },
932            Token::Search => match self.parse_search_command()? {
933                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
934                other => Err(ParseError::new(
935                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
936                    self.position(),
937                )),
938            },
939            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
940                match self.parse_ask_query()? {
941                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
942                    other => Err(ParseError::new(
943                        format!("internal: ASK produced unexpected query kind {other:?}"),
944                        self.position(),
945                    )),
946                }
947            }
948            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
949                match self.parse_unseal_vault_command()? {
950                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
951                    other => Err(ParseError::new(
952                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
953                        self.position(),
954                    )),
955                }
956            }
957            Token::Queue => match self.parse_queue_command()? {
958                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
959                other => Err(ParseError::new(
960                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
961                    self.position(),
962                )),
963            },
964            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
965                match self.parse_events_command()? {
966                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
967                        SqlQuery::Select(query),
968                    ))),
969                    QueryExpr::EventsBackfill(query) => {
970                        Ok(FrontendStatement::EventsBackfill(query))
971                    }
972                    QueryExpr::EventsBackfillStatus { collection } => {
973                        Ok(FrontendStatement::EventsBackfillStatus { collection })
974                    }
975                    other => Err(ParseError::new(
976                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
977                        self.position(),
978                    )),
979                }
980            }
981            Token::Kv => match self.parse_kv_command()? {
982                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
983                other => Err(ParseError::new(
984                    format!("internal: KV produced unexpected query kind {other:?}"),
985                    self.position(),
986                )),
987            },
988            Token::Delete => {
989                if matches!(
990                    self.peek_next()?,
991                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
992                ) {
993                    match self.parse_config_command()? {
994                        QueryExpr::ConfigCommand(command) => {
995                            Ok(FrontendStatement::ConfigCommand(command))
996                        }
997                        other => Err(ParseError::new(
998                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
999                            self.position(),
1000                        )),
1001                    }
1002                } else if matches!(
1003                    self.peek_next()?,
1004                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1005                ) {
1006                    match self.parse_vault_lifecycle_command()? {
1007                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1008                        other => Err(ParseError::new(
1009                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1010                            self.position(),
1011                        )),
1012                    }
1013                } else {
1014                    self.parse_sql_statement().map(FrontendStatement::Sql)
1015                }
1016            }
1017            Token::Add => match self.parse_config_command()? {
1018                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1019                other => Err(ParseError::new(
1020                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
1021                    self.position(),
1022                )),
1023            },
1024            Token::Purge => match self.parse_vault_lifecycle_command()? {
1025                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1026                other => Err(ParseError::new(
1027                    format!("internal: VAULT produced unexpected query kind {other:?}"),
1028                    self.position(),
1029                )),
1030            },
1031            Token::Ident(name)
1032                if name.eq_ignore_ascii_case("PUT")
1033                    || name.eq_ignore_ascii_case("GET")
1034                    || name.eq_ignore_ascii_case("RESOLVE")
1035                    || name.eq_ignore_ascii_case("ROTATE")
1036                    || name.eq_ignore_ascii_case("HISTORY")
1037                    || name.eq_ignore_ascii_case("PURGE")
1038                    || name.eq_ignore_ascii_case("INCR")
1039                    || name.eq_ignore_ascii_case("DECR")
1040                    || name.eq_ignore_ascii_case("INVALIDATE") =>
1041            {
1042                if matches!(
1043                    self.peek_next()?,
1044                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1045                ) {
1046                    match self.parse_vault_lifecycle_command()? {
1047                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1048                        other => Err(ParseError::new(
1049                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1050                            self.position(),
1051                        )),
1052                    }
1053                } else {
1054                    match self.parse_config_command()? {
1055                        QueryExpr::ConfigCommand(command) => {
1056                            Ok(FrontendStatement::ConfigCommand(command))
1057                        }
1058                        other => Err(ParseError::new(
1059                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1060                            self.position(),
1061                        )),
1062                    }
1063                }
1064            }
1065            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1066                match self.parse_vault_command()? {
1067                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1068                    other => Err(ParseError::new(
1069                        format!("internal: VAULT produced unexpected query kind {other:?}"),
1070                        self.position(),
1071                    )),
1072                }
1073            }
1074            Token::Tree => match self.parse_tree_command()? {
1075                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1076                other => Err(ParseError::new(
1077                    format!("internal: TREE produced unexpected query kind {other:?}"),
1078                    self.position(),
1079                )),
1080            },
1081            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1082                match self.parse_hll_command()? {
1083                    QueryExpr::ProbabilisticCommand(command) => {
1084                        Ok(FrontendStatement::ProbabilisticCommand(command))
1085                    }
1086                    other => Err(ParseError::new(
1087                        format!("internal: HLL produced unexpected query kind {other:?}"),
1088                        self.position(),
1089                    )),
1090                }
1091            }
1092            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1093                match self.parse_sketch_command()? {
1094                    QueryExpr::ProbabilisticCommand(command) => {
1095                        Ok(FrontendStatement::ProbabilisticCommand(command))
1096                    }
1097                    other => Err(ParseError::new(
1098                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
1099                        self.position(),
1100                    )),
1101                }
1102            }
1103            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1104                match self.parse_filter_command()? {
1105                    QueryExpr::ProbabilisticCommand(command) => {
1106                        Ok(FrontendStatement::ProbabilisticCommand(command))
1107                    }
1108                    other => Err(ParseError::new(
1109                        format!("internal: FILTER produced unexpected query kind {other:?}"),
1110                        self.position(),
1111                    )),
1112                }
1113            }
1114            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1115                .parse_sql_command()
1116                .map(SqlCommand::into_statement)
1117                .map(FrontendStatement::Sql),
1118            other => Err(ParseError::expected(
1119                vec![
1120                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1121                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1122                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1123                ],
1124                other,
1125                self.position(),
1126            )),
1127        }
1128    }
1129
1130    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
1131    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1132        self.parse_sql_command().map(SqlCommand::into_statement)
1133    }
1134
1135    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1136        let mut path = self.expect_ident()?;
1137        while self.consume(&Token::Dot)? {
1138            let next = self.expect_ident_or_keyword()?;
1139            path = format!("{path}.{next}");
1140        }
1141        Ok(if lowercase {
1142            path.to_ascii_lowercase()
1143        } else {
1144            path
1145        })
1146    }
1147
1148    /// Parse any SQL/RQL-style command through a single frontend module.
1149    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1150        match self.peek() {
1151            Token::Select => match self.parse_select_query()? {
1152                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1153                other => Err(ParseError::new(
1154                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1155                    self.position(),
1156                )),
1157            },
1158            Token::From => match self.parse_from_query()? {
1159                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1160                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1161                other => Err(ParseError::new(
1162                    format!("internal: FROM produced unexpected query kind {other:?}"),
1163                    self.position(),
1164                )),
1165            },
1166            Token::Insert => match self.parse_insert_query()? {
1167                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1168                other => Err(ParseError::new(
1169                    format!("internal: INSERT produced unexpected query kind {other:?}"),
1170                    self.position(),
1171                )),
1172            },
1173            Token::Update => match self.parse_update_query()? {
1174                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1175                other => Err(ParseError::new(
1176                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
1177                    self.position(),
1178                )),
1179            },
1180            Token::Delete => {
1181                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1182                {
1183                    self.advance()?; // DELETE
1184                    self.advance()?; // SECRET
1185                    let key = self.parse_dotted_admin_path(true)?;
1186                    Ok(SqlCommand::DeleteSecret { key })
1187                } else {
1188                    match self.parse_delete_query()? {
1189                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1190                        other => Err(ParseError::new(
1191                            format!("internal: DELETE produced unexpected query kind {other:?}"),
1192                            self.position(),
1193                        )),
1194                    }
1195                }
1196            }
1197            Token::Truncate => {
1198                self.advance()?;
1199                let model = if self.consume(&Token::Table)? {
1200                    Some(CollectionModel::Table)
1201                } else if self.consume(&Token::Graph)? {
1202                    Some(CollectionModel::Graph)
1203                } else if self.consume(&Token::Vector)? {
1204                    Some(CollectionModel::Vector)
1205                } else if self.consume(&Token::Document)? {
1206                    Some(CollectionModel::Document)
1207                } else if self.consume(&Token::Timeseries)? {
1208                    Some(CollectionModel::TimeSeries)
1209                } else if self.consume(&Token::Kv)? {
1210                    Some(CollectionModel::Kv)
1211                } else if self.consume(&Token::Queue)? {
1212                    Some(CollectionModel::Queue)
1213                } else if self.consume(&Token::Collection)? {
1214                    None
1215                } else {
1216                    return Err(ParseError::expected(
1217                        vec![
1218                            "TABLE",
1219                            "GRAPH",
1220                            "VECTOR",
1221                            "DOCUMENT",
1222                            "TIMESERIES",
1223                            "KV",
1224                            "QUEUE",
1225                            "COLLECTION",
1226                        ],
1227                        self.peek(),
1228                        self.position(),
1229                    ));
1230                };
1231                match self.parse_truncate_body(model)? {
1232                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1233                    other => Err(ParseError::new(
1234                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1235                        self.position(),
1236                    )),
1237                }
1238            }
1239            Token::Explain => {
1240                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
1241                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
1242                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1243                {
1244                    self.advance()?; // consume EXPLAIN
1245                    match self.parse_explain_migration_after_keyword()? {
1246                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
1247                        other => Err(ParseError::new(
1248                            format!(
1249                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
1250                            ),
1251                            self.position(),
1252                        )),
1253                    }
1254                } else {
1255                    match self.parse_explain_alter_query()? {
1256                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
1257                        other => Err(ParseError::new(
1258                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
1259                            self.position(),
1260                        )),
1261                    }
1262                }
1263            }
1264            Token::Create => {
1265                let pos = self.position();
1266                self.advance()?;
1267
1268                // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
1269                // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
1270                // don't collide with other CREATE variants (TABLE, INDEX, etc.).
1271                let mut or_replace = false;
1272                if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1273                    let _ = self.consume_ident_ci("REPLACE")?;
1274                    or_replace = true;
1275                }
1276                let materialized = self.consume(&Token::Materialized)?;
1277                if self.check(&Token::View) {
1278                    self.advance()?;
1279                    let if_not_exists = self.match_if_not_exists()?;
1280                    let name = self.expect_ident()?;
1281                    // Accept `AS` — the lexer promotes it to `Token::As`
1282                    // (keyword) but some paths still see it as an ident.
1283                    if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1284                        return Err(ParseError::expected(
1285                            vec!["AS"],
1286                            self.peek(),
1287                            self.position(),
1288                        ));
1289                    }
1290                    // Recursive parse of the body. Any QueryExpr that the
1291                    // rest of the grammar accepts is valid (Select, Join, etc.).
1292                    let body = self.parse_sql_command()?.into_query_expr();
1293                    return Ok(SqlCommand::CreateView(CreateViewQuery {
1294                        name,
1295                        query: Box::new(body),
1296                        materialized,
1297                        if_not_exists,
1298                        or_replace,
1299                    }));
1300                }
1301                // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
1302                // bail out — no other CREATE form accepts those modifiers.
1303                if or_replace || materialized {
1304                    return Err(ParseError::expected(
1305                        vec!["VIEW"],
1306                        self.peek(),
1307                        self.position(),
1308                    ));
1309                }
1310
1311                if self.check(&Token::Index) || self.check(&Token::Unique) {
1312                    match self.parse_create_index_query()? {
1313                        QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1314                        other => Err(ParseError::new(
1315                            format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1316                            self.position(),
1317                        )),
1318                    }
1319                } else if self.check(&Token::Table) {
1320                    self.expect(Token::Table)?;
1321                    match self.parse_create_table_body()? {
1322                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1323                        other => Err(ParseError::new(
1324                            format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1325                            self.position(),
1326                        )),
1327                    }
1328                } else if self.check(&Token::Kv) {
1329                    self.advance()?;
1330                    match self.parse_create_keyed_body(CollectionModel::Kv)? {
1331                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1332                        other => Err(ParseError::new(
1333                            format!("internal: CREATE KV produced unexpected kind {other:?}"),
1334                            self.position(),
1335                        )),
1336                    }
1337                } else if self.consume_ident_ci("CONFIG")? {
1338                    match self.parse_create_keyed_body(CollectionModel::Config)? {
1339                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1340                        other => Err(ParseError::new(
1341                            format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1342                            self.position(),
1343                        )),
1344                    }
1345                } else if self.consume_ident_ci("VAULT")? {
1346                    match self.parse_create_keyed_body(CollectionModel::Vault)? {
1347                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1348                        other => Err(ParseError::new(
1349                            format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1350                            self.position(),
1351                        )),
1352                    }
1353                } else if self.check(&Token::Timeseries) {
1354                    self.advance()?;
1355                    match self.parse_create_timeseries_body()? {
1356                        QueryExpr::CreateTimeSeries(query) => {
1357                            Ok(SqlCommand::CreateTimeSeries(query))
1358                        }
1359                        other => Err(ParseError::new(
1360                            format!(
1361                                "internal: CREATE TIMESERIES produced unexpected kind {other:?}"
1362                            ),
1363                            self.position(),
1364                        )),
1365                    }
1366                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1367                {
1368                    self.advance()?;
1369                    match self.parse_create_hypertable_body()? {
1370                        QueryExpr::CreateTimeSeries(query) => {
1371                            Ok(SqlCommand::CreateTimeSeries(query))
1372                        }
1373                        other => Err(ParseError::new(
1374                            format!(
1375                                "internal: CREATE HYPERTABLE produced unexpected kind {other:?}"
1376                            ),
1377                            self.position(),
1378                        )),
1379                    }
1380                } else if self.check(&Token::Queue) {
1381                    self.advance()?;
1382                    match self.parse_create_queue_body()? {
1383                        QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1384                        other => Err(ParseError::new(
1385                            format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1386                            self.position(),
1387                        )),
1388                    }
1389                } else if self.check(&Token::Tree) {
1390                    self.advance()?;
1391                    match self.parse_create_tree_body()? {
1392                        QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1393                        other => Err(ParseError::new(
1394                            format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1395                            self.position(),
1396                        )),
1397                    }
1398                } else if matches!(self.peek(), Token::Ident(n) if
1399                    n.eq_ignore_ascii_case("HLL") ||
1400                    n.eq_ignore_ascii_case("SKETCH") ||
1401                    n.eq_ignore_ascii_case("FILTER"))
1402                {
1403                    match self.parse_create_probabilistic()? {
1404                        QueryExpr::ProbabilisticCommand(command) => {
1405                            Ok(SqlCommand::Probabilistic(command))
1406                        }
1407                        other => Err(ParseError::new(
1408                            format!(
1409                                "internal: CREATE probabilistic produced unexpected kind {other:?}"
1410                            ),
1411                            self.position(),
1412                        )),
1413                    }
1414                } else if self.check(&Token::Schema) {
1415                    // CREATE SCHEMA [IF NOT EXISTS] name
1416                    self.advance()?;
1417                    let if_not_exists = self.match_if_not_exists()?;
1418                    let name = self.expect_ident()?;
1419                    Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1420                        name,
1421                        if_not_exists,
1422                    }))
1423                } else if self.check(&Token::Policy) {
1424                    // Two forms share the leading `CREATE POLICY` tokens:
1425                    //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
1426                    //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
1427                    // Disambiguate by peeking the token after POLICY.
1428                    self.advance()?;
1429                    if matches!(self.peek(), Token::String(_)) {
1430                        // IAM form — short-circuit out of the SQL command stack.
1431                        let expr = self.parse_create_iam_policy_after_keywords()?;
1432                        // Inline command-wrapping: produce a synthetic SqlCommand by
1433                        // routing through a generic IAM admin holder. We don't
1434                        // have a dedicated SqlCommand variant for IAM yet, so we
1435                        // bounce through the existing Grant-shaped Admin slot
1436                        // which expects no further tokens.
1437                        return Ok(SqlCommand::IamPolicy(expr));
1438                    }
1439                    let name = self.expect_ident()?;
1440                    self.expect(Token::On)?;
1441
1442                    let (target_kind, table) = {
1443                        use crate::storage::query::ast::PolicyTargetKind;
1444                        let kw = match self.peek() {
1445                            Token::Ident(s) => Some(s.to_ascii_uppercase()),
1446                            _ => None,
1447                        };
1448                        let kind = kw.as_deref().and_then(|k| match k {
1449                            "NODES" => Some(PolicyTargetKind::Nodes),
1450                            "EDGES" => Some(PolicyTargetKind::Edges),
1451                            "VECTORS" => Some(PolicyTargetKind::Vectors),
1452                            "MESSAGES" => Some(PolicyTargetKind::Messages),
1453                            "POINTS" => Some(PolicyTargetKind::Points),
1454                            "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1455                            _ => None,
1456                        });
1457                        if let Some(k) = kind {
1458                            self.advance()?;
1459                            self.expect(Token::Of)?;
1460                            let coll = self.expect_ident()?;
1461                            (k, coll)
1462                        } else {
1463                            let coll = self.expect_ident()?;
1464                            (PolicyTargetKind::Table, coll)
1465                        }
1466                    };
1467
1468                    let action = if self.consume(&Token::For)? {
1469                        let a = match self.peek() {
1470                            Token::Select => {
1471                                self.advance()?;
1472                                Some(PolicyAction::Select)
1473                            }
1474                            Token::Insert => {
1475                                self.advance()?;
1476                                Some(PolicyAction::Insert)
1477                            }
1478                            Token::Update => {
1479                                self.advance()?;
1480                                Some(PolicyAction::Update)
1481                            }
1482                            Token::Delete => {
1483                                self.advance()?;
1484                                Some(PolicyAction::Delete)
1485                            }
1486                            Token::All => {
1487                                self.advance()?;
1488                                None
1489                            }
1490                            _ => None,
1491                        };
1492                        a
1493                    } else {
1494                        None
1495                    };
1496
1497                    let role = if self.consume(&Token::To)? {
1498                        Some(self.expect_ident()?)
1499                    } else {
1500                        None
1501                    };
1502
1503                    self.expect(Token::Using)?;
1504                    self.expect(Token::LParen)?;
1505                    let filter = self.parse_filter()?;
1506                    self.expect(Token::RParen)?;
1507
1508                    Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1509                        name,
1510                        table,
1511                        action,
1512                        role,
1513                        using: Box::new(filter),
1514                        target_kind,
1515                    }))
1516                } else if self.check(&Token::Server) {
1517                    // CREATE SERVER [IF NOT EXISTS] name
1518                    //   FOREIGN DATA WRAPPER kind
1519                    //   [OPTIONS (key 'value', ...)]
1520                    self.advance()?;
1521                    let if_not_exists = self.match_if_not_exists()?;
1522                    let name = self.expect_ident()?;
1523                    self.expect(Token::Foreign)?;
1524                    self.expect(Token::Data)?;
1525                    self.expect(Token::Wrapper)?;
1526                    let wrapper = self.expect_ident()?;
1527                    let options = self.parse_fdw_options_clause()?;
1528                    Ok(SqlCommand::CreateServer(CreateServerQuery {
1529                        name,
1530                        wrapper,
1531                        options,
1532                        if_not_exists,
1533                    }))
1534                } else if self.check(&Token::Foreign) {
1535                    // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
1536                    //   SERVER server_name
1537                    //   [OPTIONS (key 'value', ...)]
1538                    self.advance()?;
1539                    self.expect(Token::Table)?;
1540                    let if_not_exists = self.match_if_not_exists()?;
1541                    let name = self.expect_ident()?;
1542                    self.expect(Token::LParen)?;
1543                    let mut columns = Vec::new();
1544                    loop {
1545                        let col_name = self.expect_ident()?;
1546                        let data_type = self.expect_ident_or_keyword()?;
1547                        // Inline NOT NULL check — the CREATE TABLE path's helper is
1548                        // private and coupling to it just for FDW columns isn't worth it.
1549                        let mut not_null = false;
1550                        if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1551                            self.advance()?;
1552                            if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL"))
1553                            {
1554                                self.advance()?;
1555                                not_null = true;
1556                            }
1557                        }
1558                        columns.push(ForeignColumnDef {
1559                            name: col_name,
1560                            data_type,
1561                            not_null,
1562                        });
1563                        if !self.consume(&Token::Comma)? {
1564                            break;
1565                        }
1566                    }
1567                    self.expect(Token::RParen)?;
1568                    self.expect(Token::Server)?;
1569                    let server = self.expect_ident()?;
1570                    let options = self.parse_fdw_options_clause()?;
1571                    Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1572                        name,
1573                        server,
1574                        columns,
1575                        options,
1576                        if_not_exists,
1577                    }))
1578                } else if self.check(&Token::Sequence) {
1579                    // CREATE SEQUENCE [IF NOT EXISTS] name
1580                    //   [START [WITH] n] [INCREMENT [BY] n]
1581                    self.advance()?;
1582                    let if_not_exists = self.match_if_not_exists()?;
1583                    let name = self.expect_ident()?;
1584                    let mut start: i64 = 1;
1585                    let mut increment: i64 = 1;
1586                    // Loop over optional clauses in any order.
1587                    loop {
1588                        if self.consume(&Token::Start)? {
1589                            // Accept `START 100` or `START WITH 100`.
1590                            let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1591                            start = self.parse_integer()?;
1592                        } else if self.consume(&Token::Increment)? {
1593                            // Accept `INCREMENT 5` or `INCREMENT BY 5`.
1594                            let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1595                            increment = self.parse_integer()?;
1596                        } else {
1597                            break;
1598                        }
1599                    }
1600                    Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1601                        name,
1602                        if_not_exists,
1603                        start,
1604                        increment,
1605                    }))
1606                } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1607                {
1608                    self.advance()?; // consume MIGRATION
1609                    match self.parse_create_migration_body()? {
1610                        QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1611                        other => Err(ParseError::new(
1612                            format!(
1613                                "internal: CREATE MIGRATION produced unexpected kind {other:?}"
1614                            ),
1615                            self.position(),
1616                        )),
1617                    }
1618                } else {
1619                    Err(ParseError::expected(
1620                        vec![
1621                            "TABLE",
1622                            "GRAPH",
1623                            "VECTOR",
1624                            "DOCUMENT",
1625                            "KV",
1626                            "COLLECTION",
1627                            "INDEX",
1628                            "UNIQUE",
1629                            "TIMESERIES",
1630                            "QUEUE",
1631                            "TREE",
1632                            "HLL",
1633                            "SKETCH",
1634                            "FILTER",
1635                            "SCHEMA",
1636                            "SEQUENCE",
1637                            "MIGRATION",
1638                        ],
1639                        self.peek(),
1640                        pos,
1641                    ))
1642                }
1643            }
1644            Token::Drop => {
1645                let pos = self.position();
1646                self.advance()?;
1647
1648                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
1649                let materialized = self.consume(&Token::Materialized)?;
1650                if self.check(&Token::View) {
1651                    self.advance()?;
1652                    let if_exists = self.match_if_exists()?;
1653                    let name = self.expect_ident()?;
1654                    return Ok(SqlCommand::DropView(DropViewQuery {
1655                        name,
1656                        materialized,
1657                        if_exists,
1658                    }));
1659                }
1660                if materialized {
1661                    return Err(ParseError::expected(
1662                        vec!["VIEW"],
1663                        self.peek(),
1664                        self.position(),
1665                    ));
1666                }
1667
1668                if self.check(&Token::Index) {
1669                    match self.parse_drop_index_query()? {
1670                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
1671                        other => Err(ParseError::new(
1672                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
1673                            self.position(),
1674                        )),
1675                    }
1676                } else if self.check(&Token::Table) {
1677                    self.expect(Token::Table)?;
1678                    match self.parse_drop_table_body()? {
1679                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
1680                        other => Err(ParseError::new(
1681                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
1682                            self.position(),
1683                        )),
1684                    }
1685                } else if self.check(&Token::Graph) {
1686                    self.advance()?;
1687                    match self.parse_drop_graph_body()? {
1688                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
1689                        other => Err(ParseError::new(
1690                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
1691                            self.position(),
1692                        )),
1693                    }
1694                } else if self.check(&Token::Vector) {
1695                    self.advance()?;
1696                    match self.parse_drop_vector_body()? {
1697                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
1698                        other => Err(ParseError::new(
1699                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
1700                            self.position(),
1701                        )),
1702                    }
1703                } else if self.check(&Token::Document) {
1704                    self.advance()?;
1705                    match self.parse_drop_document_body()? {
1706                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
1707                        other => Err(ParseError::new(
1708                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
1709                            self.position(),
1710                        )),
1711                    }
1712                } else if self.check(&Token::Kv) {
1713                    self.advance()?;
1714                    match self.parse_drop_kv_body()? {
1715                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1716                        other => Err(ParseError::new(
1717                            format!("internal: DROP KV produced unexpected kind {other:?}"),
1718                            self.position(),
1719                        )),
1720                    }
1721                } else if self.consume_ident_ci("CONFIG")? {
1722                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
1723                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1724                        other => Err(ParseError::new(
1725                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
1726                            self.position(),
1727                        )),
1728                    }
1729                } else if self.consume_ident_ci("VAULT")? {
1730                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
1731                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1732                        other => Err(ParseError::new(
1733                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
1734                            self.position(),
1735                        )),
1736                    }
1737                } else if self.check(&Token::Collection) {
1738                    self.advance()?;
1739                    match self.parse_drop_collection_body()? {
1740                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1741                        other => Err(ParseError::new(
1742                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
1743                            self.position(),
1744                        )),
1745                    }
1746                } else if self.check(&Token::Timeseries) {
1747                    self.advance()?;
1748                    match self.parse_drop_timeseries_body()? {
1749                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1750                        other => Err(ParseError::new(
1751                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
1752                            self.position(),
1753                        )),
1754                    }
1755                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1756                {
1757                    // DROP HYPERTABLE name reuses the same AST as
1758                    // DROP TIMESERIES — runtime clears the registry
1759                    // entry *and* drops the backing collection.
1760                    self.advance()?;
1761                    match self.parse_drop_timeseries_body()? {
1762                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1763                        other => Err(ParseError::new(
1764                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
1765                            self.position(),
1766                        )),
1767                    }
1768                } else if self.check(&Token::Queue) {
1769                    self.advance()?;
1770                    match self.parse_drop_queue_body()? {
1771                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
1772                        other => Err(ParseError::new(
1773                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
1774                            self.position(),
1775                        )),
1776                    }
1777                } else if self.check(&Token::Tree) {
1778                    self.advance()?;
1779                    match self.parse_drop_tree_body()? {
1780                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
1781                        other => Err(ParseError::new(
1782                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
1783                            self.position(),
1784                        )),
1785                    }
1786                } else if matches!(self.peek(), Token::Ident(n) if
1787                    n.eq_ignore_ascii_case("HLL") ||
1788                    n.eq_ignore_ascii_case("SKETCH") ||
1789                    n.eq_ignore_ascii_case("FILTER"))
1790                {
1791                    match self.parse_drop_probabilistic()? {
1792                        QueryExpr::ProbabilisticCommand(command) => {
1793                            Ok(SqlCommand::Probabilistic(command))
1794                        }
1795                        other => Err(ParseError::new(
1796                            format!(
1797                                "internal: DROP probabilistic produced unexpected kind {other:?}"
1798                            ),
1799                            self.position(),
1800                        )),
1801                    }
1802                } else if self.check(&Token::Schema) {
1803                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
1804                    self.advance()?;
1805                    let if_exists = self.match_if_exists()?;
1806                    let name = self.expect_ident()?;
1807                    let cascade = self.consume(&Token::Cascade)?;
1808                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
1809                        name,
1810                        if_exists,
1811                        cascade,
1812                    }))
1813                } else if self.check(&Token::Policy) {
1814                    // Two forms:
1815                    //   * IAM:   DROP POLICY '<id>'
1816                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
1817                    self.advance()?;
1818                    if matches!(self.peek(), Token::String(_)) {
1819                        let expr = self.parse_drop_iam_policy_after_keywords()?;
1820                        return Ok(SqlCommand::IamPolicy(expr));
1821                    }
1822                    let if_exists = self.match_if_exists()?;
1823                    let name = self.expect_ident()?;
1824                    self.expect(Token::On)?;
1825                    let table = self.expect_ident()?;
1826                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
1827                        name,
1828                        table,
1829                        if_exists,
1830                    }))
1831                } else if self.check(&Token::Server) {
1832                    // DROP SERVER [IF EXISTS] name [CASCADE]
1833                    self.advance()?;
1834                    let if_exists = self.match_if_exists()?;
1835                    let name = self.expect_ident()?;
1836                    let cascade = self.consume(&Token::Cascade)?;
1837                    Ok(SqlCommand::DropServer(DropServerQuery {
1838                        name,
1839                        if_exists,
1840                        cascade,
1841                    }))
1842                } else if self.check(&Token::Foreign) {
1843                    // DROP FOREIGN TABLE [IF EXISTS] name
1844                    self.advance()?;
1845                    self.expect(Token::Table)?;
1846                    let if_exists = self.match_if_exists()?;
1847                    let name = self.expect_ident()?;
1848                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
1849                        name,
1850                        if_exists,
1851                    }))
1852                } else if self.check(&Token::Sequence) {
1853                    // DROP SEQUENCE [IF EXISTS] name
1854                    self.advance()?;
1855                    let if_exists = self.match_if_exists()?;
1856                    let name = self.expect_ident()?;
1857                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
1858                        name,
1859                        if_exists,
1860                    }))
1861                } else {
1862                    Err(ParseError::expected(
1863                        vec![
1864                            "TABLE",
1865                            "INDEX",
1866                            "TIMESERIES",
1867                            "QUEUE",
1868                            "TREE",
1869                            "HLL",
1870                            "SKETCH",
1871                            "FILTER",
1872                            "SCHEMA",
1873                            "SEQUENCE",
1874                        ],
1875                        self.peek(),
1876                        pos,
1877                    ))
1878                }
1879            }
1880            Token::Alter => {
1881                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
1882                // committing to a path until we've seen the target.
1883                // We peek the *next* token (without consuming) and
1884                // dispatch accordingly.
1885                let next = self.peek_next()?.clone();
1886                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
1887                    self.advance()?; // consume ALTER
1888                    let stmt = self.parse_alter_user_statement()?;
1889                    Ok(SqlCommand::AlterUser(stmt))
1890                } else if matches!(next, Token::Queue) {
1891                    self.advance()?; // consume ALTER
1892                    self.advance()?; // consume QUEUE
1893                    match self.parse_alter_queue_body()? {
1894                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
1895                        other => Err(ParseError::new(
1896                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
1897                            self.position(),
1898                        )),
1899                    }
1900                } else {
1901                    match self.parse_alter_table_query()? {
1902                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
1903                        other => Err(ParseError::new(
1904                            format!("internal: ALTER produced unexpected query kind {other:?}"),
1905                            self.position(),
1906                        )),
1907                    }
1908                }
1909            }
1910            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
1911                let stmt = self.parse_grant_statement()?;
1912                Ok(SqlCommand::Grant(stmt))
1913            }
1914            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
1915                let stmt = self.parse_revoke_statement()?;
1916                Ok(SqlCommand::Revoke(stmt))
1917            }
1918            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1919                self.advance()?;
1920                if self.consume_ident_ci("BACKFILL")? {
1921                    return Err(ParseError::new(
1922                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
1923                            .to_string(),
1924                        self.position(),
1925                    ));
1926                }
1927                if !self.consume_ident_ci("STATUS")? {
1928                    return Err(ParseError::expected(
1929                        vec!["STATUS"],
1930                        self.peek(),
1931                        self.position(),
1932                    ));
1933                }
1934
1935                let mut query = TableQuery::new("red.subscriptions");
1936                let collection = match self.peek().clone() {
1937                    Token::Ident(name) => {
1938                        self.advance()?;
1939                        Some(name)
1940                    }
1941                    Token::String(name) => {
1942                        self.advance()?;
1943                        Some(name)
1944                    }
1945                    _ => None,
1946                };
1947                self.parse_table_clauses(&mut query)?;
1948                if let Some(collection) = collection {
1949                    let filter = Filter::compare(
1950                        FieldRef::column("red.subscriptions", "collection"),
1951                        CompareOp::Eq,
1952                        Value::text(collection),
1953                    );
1954                    let expr = filter_to_expr(&filter);
1955                    query.where_expr = Some(match query.where_expr.take() {
1956                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
1957                        None => expr,
1958                    });
1959                    query.filter = Some(match query.filter.take() {
1960                        Some(existing) => existing.and(filter),
1961                        None => filter,
1962                    });
1963                }
1964                Ok(SqlCommand::Select(query))
1965            }
1966            Token::Attach => {
1967                let expr = self.parse_attach_policy()?;
1968                Ok(SqlCommand::IamPolicy(expr))
1969            }
1970            Token::Detach => {
1971                let expr = self.parse_detach_policy()?;
1972                Ok(SqlCommand::IamPolicy(expr))
1973            }
1974            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
1975                let expr = self.parse_simulate_policy()?;
1976                Ok(SqlCommand::IamPolicy(expr))
1977            }
1978            Token::Set => {
1979                self.advance()?;
1980                if self.consume_ident_ci("CONFIG")? {
1981                    let full_key = self.parse_dotted_admin_path(true)?;
1982                    self.expect(Token::Eq)?;
1983                    let value = self.parse_literal_value()?;
1984                    Ok(SqlCommand::SetConfig {
1985                        key: full_key,
1986                        value,
1987                    })
1988                } else if self.consume_ident_ci("SECRET")? {
1989                    let key = self.parse_dotted_admin_path(true)?;
1990                    self.expect(Token::Eq)?;
1991                    let value = self.parse_literal_value()?;
1992                    Ok(SqlCommand::SetSecret { key, value })
1993                } else if self.consume_ident_ci("TENANT")? {
1994                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
1995                    // SET TENANT NULL  |  SET TENANT = NULL
1996                    let _ = self.consume(&Token::Eq)?;
1997                    if self.consume_ident_ci("NULL")? {
1998                        Ok(SqlCommand::SetTenant(None))
1999                    } else {
2000                        let value = self.parse_literal_value()?;
2001                        match value {
2002                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2003                            Value::Null => Ok(SqlCommand::SetTenant(None)),
2004                            other => Err(ParseError::new(
2005                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2006                                self.position(),
2007                            )),
2008                        }
2009                    }
2010                } else {
2011                    Err(ParseError::expected(
2012                        vec!["CONFIG", "SECRET", "TENANT"],
2013                        self.peek(),
2014                        self.position(),
2015                    ))
2016                }
2017            }
2018            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2019                self.advance()?;
2020                match self.parse_apply_migration()? {
2021                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2022                    other => Err(ParseError::new(
2023                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2024                        self.position(),
2025                    )),
2026                }
2027            }
2028            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2029                // RESET TENANT — session-local clear
2030                self.advance()?;
2031                if self.consume_ident_ci("TENANT")? {
2032                    Ok(SqlCommand::SetTenant(None))
2033                } else {
2034                    Err(ParseError::expected(
2035                        vec!["TENANT"],
2036                        self.peek(),
2037                        self.position(),
2038                    ))
2039                }
2040            }
2041            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2042                self.advance()?;
2043                if self.consume_ident_ci("CONFIG")? {
2044                    // Accept dotted prefixes the same way SET CONFIG does
2045                    // (`SHOW CONFIG durability.mode`), and empty prefix
2046                    // (`SHOW CONFIG`) for a catalog-wide listing.
2047                    let prefix = if !self.check(&Token::Eof) {
2048                        let first = self.expect_ident()?;
2049                        let mut full = first;
2050                        while self.consume(&Token::Dot)? {
2051                            let next = self.expect_ident_or_keyword()?;
2052                            full = format!("{full}.{next}");
2053                        }
2054                        // Match SET CONFIG: lowercase so keyword segments
2055                        // come out consistent with the stored keys.
2056                        Some(full.to_ascii_lowercase())
2057                    } else {
2058                        None
2059                    };
2060                    Ok(SqlCommand::ShowConfig { prefix })
2061                } else if self.consume_ident_ci("COLLECTIONS")? {
2062                    let mut query = TableQuery::new("red.collections");
2063                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2064                        if !self.consume_ident_ci("INTERNAL")? {
2065                            return Err(ParseError::expected(
2066                                vec!["INTERNAL"],
2067                                self.peek(),
2068                                self.position(),
2069                            ));
2070                        }
2071                        true
2072                    } else {
2073                        false
2074                    };
2075                    self.parse_table_clauses(&mut query)?;
2076                    if !include_internal {
2077                        let user_filter = query.filter.take();
2078                        let hide_internal = crate::storage::query::ast::Filter::Compare {
2079                            field: FieldRef::column("", "internal"),
2080                            op: CompareOp::Eq,
2081                            value: Value::Boolean(false),
2082                        };
2083                        query.filter = Some(match user_filter {
2084                            Some(filter) => filter.and(hide_internal),
2085                            None => hide_internal,
2086                        });
2087                    }
2088                    Ok(SqlCommand::Select(query))
2089                } else if self.consume_ident_ci("TABLES")? {
2090                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2091                        self, "table",
2092                    )?))
2093                } else if self.consume_ident_ci("QUEUES")? {
2094                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2095                        self, "queue",
2096                    )?))
2097                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2098                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2099                        self, "vector",
2100                    )?))
2101                } else if self.consume_ident_ci("DOCUMENTS")? {
2102                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2103                        self, "document",
2104                    )?))
2105                } else if self.consume(&Token::Timeseries)?
2106                    || self.consume_ident_ci("TIMESERIES")?
2107                {
2108                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2109                        self,
2110                        "timeseries",
2111                    )?))
2112                } else if self.consume_ident_ci("GRAPHS")? {
2113                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2114                        self, "graph",
2115                    )?))
2116                } else if self.consume_ident_ci("CONFIGS")? {
2117                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2118                        self, "config",
2119                    )?))
2120                } else if self.consume_ident_ci("VAULTS")? {
2121                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2122                        self, "vault",
2123                    )?))
2124                } else if self.consume(&Token::Kv)?
2125                    || self.consume_ident_ci("KV")?
2126                    || self.consume_ident_ci("KVS")?
2127                {
2128                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2129                        self, "kv",
2130                    )?))
2131                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2132                    let collection = self.parse_dotted_admin_path(false)?;
2133                    let mut query = TableQuery::new("red.columns");
2134                    query.filter = Some(Filter::compare(
2135                        FieldRef::column("", "collection"),
2136                        CompareOp::Eq,
2137                        Value::text(collection),
2138                    ));
2139                    Ok(SqlCommand::Select(query))
2140                } else if self.consume_ident_ci("INDICES")? {
2141                    let mut query = TableQuery::new("red.indices");
2142                    if self.consume(&Token::On)? {
2143                        let collection = self.expect_ident_or_keyword()?;
2144                        let filter = Filter::Compare {
2145                            field: FieldRef::column("red.indices", "collection"),
2146                            op: CompareOp::Eq,
2147                            value: Value::text(collection),
2148                        };
2149                        query.where_expr = Some(filter_to_expr(&filter));
2150                        query.filter = Some(filter);
2151                    }
2152                    self.parse_table_clauses(&mut query)?;
2153                    Ok(SqlCommand::Select(query))
2154                } else if self.consume_ident_ci("POLICIES")? {
2155                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2156                        let principal = self.parse_iam_principal_kind()?;
2157                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2158                            filter: Some(principal),
2159                        }));
2160                    }
2161                    let mut query = TableQuery::new("red.policies");
2162                    let collection_filter =
2163                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2164                            let collection = self.parse_dotted_admin_path(false)?;
2165                            Some(Filter::Compare {
2166                                field: FieldRef::TableColumn {
2167                                    table: String::new(),
2168                                    column: "collection".to_string(),
2169                                },
2170                                op: CompareOp::Eq,
2171                                value: Value::text(collection),
2172                            })
2173                        } else {
2174                            None
2175                        };
2176                    self.parse_table_clauses(&mut query)?;
2177                    if let Some(collection_filter) = collection_filter {
2178                        let combined = match query.filter.take() {
2179                            Some(existing) => {
2180                                Filter::And(Box::new(collection_filter), Box::new(existing))
2181                            }
2182                            None => collection_filter,
2183                        };
2184                        query.where_expr = Some(filter_to_expr(&combined));
2185                        query.filter = Some(combined);
2186                    }
2187                    Ok(SqlCommand::Select(query))
2188                } else if self.consume_ident_ci("STATS")? {
2189                    let mut query = TableQuery::new("red.stats");
2190                    let collection = match self.peek().clone() {
2191                        Token::Ident(name) => {
2192                            self.advance()?;
2193                            Some(name)
2194                        }
2195                        Token::String(name) => {
2196                            self.advance()?;
2197                            Some(name)
2198                        }
2199                        _ => None,
2200                    };
2201                    self.parse_table_clauses(&mut query)?;
2202                    if let Some(collection) = collection {
2203                        let filter = Filter::compare(
2204                            FieldRef::column("red.stats", "collection"),
2205                            CompareOp::Eq,
2206                            Value::text(collection),
2207                        );
2208                        let expr = filter_to_expr(&filter);
2209                        query.where_expr = Some(match query.where_expr.take() {
2210                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
2211                            None => expr,
2212                        });
2213                        query.filter = Some(match query.filter.take() {
2214                            Some(existing) => existing.and(filter),
2215                            None => filter,
2216                        });
2217                    }
2218                    Ok(SqlCommand::Select(query))
2219                } else if self.consume_ident_ci("SAMPLE")? {
2220                    let mut query = TableQuery::new(&self.expect_ident()?);
2221                    query.limit = if self.consume(&Token::Limit)? {
2222                        Some(self.parse_integer()? as u64)
2223                    } else {
2224                        Some(10)
2225                    };
2226                    Ok(SqlCommand::Select(query))
2227                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2228                    let prefix = if !self.check(&Token::Eof) {
2229                        Some(self.parse_dotted_admin_path(true)?)
2230                    } else {
2231                        None
2232                    };
2233                    Ok(SqlCommand::ShowSecrets { prefix })
2234                } else if self.consume_ident_ci("TENANT")? {
2235                    Ok(SqlCommand::ShowTenant)
2236                } else if let Some(expr) = self.parse_show_iam_after_show()? {
2237                    Ok(SqlCommand::IamPolicy(expr))
2238                } else {
2239                    Err(ParseError::expected(
2240                        vec![
2241                            "CONFIG",
2242                            "SECRET",
2243                            "SECRETS",
2244                            "COLLECTIONS",
2245                            "TABLES",
2246                            "QUEUES",
2247                            "VECTORS",
2248                            "DOCUMENTS",
2249                            "TIMESERIES",
2250                            "GRAPHS",
2251                            "KV",
2252                            "SCHEMA",
2253                            "INDICES",
2254                            "SAMPLE",
2255                            "POLICIES",
2256                            "STATS",
2257                            "TENANT",
2258                            "EFFECTIVE",
2259                        ],
2260                        self.peek(),
2261                        self.position(),
2262                    ))
2263                }
2264            }
2265            // Transaction control statements (Phase 1.1 PG parity).
2266            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
2267            // START TRANSACTION [ISOLATION LEVEL <mode>]
2268            //
2269            // We only implement SNAPSHOT ISOLATION (our default). We
2270            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2271            // READ / SNAPSHOT as PG-compatible no-ops, but reject
2272            // SERIALIZABLE outright — the previous behaviour of
2273            // silently degrading to snapshot made the parser
2274            // dishonest. Real SSI (Serializable Snapshot Isolation)
2275            // is tracked as a future milestone.
2276            Token::Begin | Token::Start => {
2277                self.advance()?;
2278                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2279                // Optional ISOLATION LEVEL clause.
2280                if self.consume_ident_ci("ISOLATION")? {
2281                    self.expect(Token::Level)?;
2282                    // The level identifier can span multiple words
2283                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2284                    // READ). Collect them case-insensitively.
2285                    let mut parts: Vec<String> = Vec::new();
2286                    if self.consume_ident_ci("READ")? {
2287                        parts.push("READ".to_string());
2288                        if self.consume_ident_ci("UNCOMMITTED")? {
2289                            parts.push("UNCOMMITTED".to_string());
2290                        } else if self.consume_ident_ci("COMMITTED")? {
2291                            parts.push("COMMITTED".to_string());
2292                        } else {
2293                            return Err(ParseError::expected(
2294                                vec!["UNCOMMITTED", "COMMITTED"],
2295                                self.peek(),
2296                                self.position(),
2297                            ));
2298                        }
2299                    } else if self.consume_ident_ci("REPEATABLE")? {
2300                        parts.push("REPEATABLE".to_string());
2301                        if !self.consume_ident_ci("READ")? {
2302                            return Err(ParseError::expected(
2303                                vec!["READ"],
2304                                self.peek(),
2305                                self.position(),
2306                            ));
2307                        }
2308                        parts.push("READ".to_string());
2309                    } else if self.consume_ident_ci("SNAPSHOT")? {
2310                        parts.push("SNAPSHOT".to_string());
2311                    } else if self.consume_ident_ci("SERIALIZABLE")? {
2312                        return Err(ParseError::new(
2313                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2314                             currently provides SNAPSHOT ISOLATION (which PG calls \
2315                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2316                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
2317                                .to_string(),
2318                            self.position(),
2319                        ));
2320                    } else {
2321                        return Err(ParseError::expected(
2322                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2323                            self.peek(),
2324                            self.position(),
2325                        ));
2326                    }
2327                    // All accepted modes map to our snapshot engine today.
2328                    let _ = parts;
2329                }
2330                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2331            }
2332            // COMMIT [WORK | TRANSACTION]
2333            Token::Commit => {
2334                self.advance()?;
2335                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2336                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2337            }
2338            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
2339            // ROLLBACK MIGRATION name
2340            Token::Rollback => {
2341                self.advance()?;
2342                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2343                    match self.parse_rollback_migration_after_keyword()? {
2344                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2345                        other => Err(ParseError::new(
2346                            format!(
2347                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2348                            ),
2349                            self.position(),
2350                        )),
2351                    }
2352                } else {
2353                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2354                    if self.consume(&Token::To)? {
2355                        let _ = self.consume(&Token::Savepoint)?;
2356                        let name = self.expect_ident()?;
2357                        Ok(SqlCommand::TransactionControl(
2358                            TxnControl::RollbackToSavepoint(name),
2359                        ))
2360                    } else {
2361                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2362                    }
2363                }
2364            }
2365            // SAVEPOINT name
2366            Token::Savepoint => {
2367                self.advance()?;
2368                let name = self.expect_ident()?;
2369                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2370            }
2371            // RELEASE [SAVEPOINT] name
2372            Token::Release => {
2373                self.advance()?;
2374                let _ = self.consume(&Token::Savepoint)?;
2375                let name = self.expect_ident()?;
2376                Ok(SqlCommand::TransactionControl(
2377                    TxnControl::ReleaseSavepoint(name),
2378                ))
2379            }
2380            // VACUUM [FULL] [table]
2381            Token::Vacuum => {
2382                self.advance()?;
2383                let full = self.consume(&Token::Full)?;
2384                let target = if self.check(&Token::Eof) {
2385                    None
2386                } else {
2387                    Some(self.expect_ident()?)
2388                };
2389                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2390                    target,
2391                    full,
2392                }))
2393            }
2394            // REFRESH MATERIALIZED VIEW name
2395            Token::Refresh => {
2396                self.advance()?;
2397                self.expect(Token::Materialized)?;
2398                self.expect(Token::View)?;
2399                let name = self.expect_ident()?;
2400                Ok(SqlCommand::RefreshMaterializedView(
2401                    RefreshMaterializedViewQuery { name },
2402                ))
2403            }
2404            // ANALYZE [table]
2405            Token::Analyze => {
2406                self.advance()?;
2407                let target = if self.check(&Token::Eof) {
2408                    None
2409                } else {
2410                    Some(self.expect_ident()?)
2411                };
2412                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2413                    target,
2414                }))
2415            }
2416            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
2417            //
2418            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
2419            // short-form `DELIMITER ',' HEADER`. The only supported format
2420            // today is CSV.
2421            Token::Copy => {
2422                self.advance()?;
2423                let table = self.expect_ident()?;
2424                self.expect(Token::From)?;
2425                let path = self.parse_string()?;
2426
2427                let mut delimiter: Option<char> = None;
2428                let mut has_header = false;
2429                let format = CopyFormat::Csv;
2430
2431                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
2432                // `WITH` is a reserved keyword token — accept both the keyword
2433                // form and the ident form that non-CTE callers sometimes emit.
2434                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2435                    self.expect(Token::LParen)?;
2436                    loop {
2437                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2438                            let _ = self.consume(&Token::Eq)?;
2439                            // Only CSV for now — accept the ident and move on.
2440                            let _ = self.expect_ident()?;
2441                        } else if self.consume(&Token::Header)? {
2442                            let _ = self.consume(&Token::Eq)?;
2443                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
2444                            // or an ident spelling of true/false.
2445                            has_header = match self.peek().clone() {
2446                                Token::True => {
2447                                    self.advance()?;
2448                                    true
2449                                }
2450                                Token::False => {
2451                                    self.advance()?;
2452                                    false
2453                                }
2454                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2455                                    self.advance()?;
2456                                    true
2457                                }
2458                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2459                                    self.advance()?;
2460                                    false
2461                                }
2462                                _ => true,
2463                            };
2464                        } else if self.consume(&Token::Delimiter)? {
2465                            let _ = self.consume(&Token::Eq)?;
2466                            let s = self.parse_string()?;
2467                            delimiter = s.chars().next();
2468                        } else {
2469                            break;
2470                        }
2471                        if !self.consume(&Token::Comma)? {
2472                            break;
2473                        }
2474                    }
2475                    self.expect(Token::RParen)?;
2476                }
2477
2478                // Short form clauses outside WITH (in either order).
2479                loop {
2480                    if self.consume(&Token::Delimiter)? {
2481                        let s = self.parse_string()?;
2482                        delimiter = s.chars().next();
2483                    } else if self.consume(&Token::Header)? {
2484                        has_header = true;
2485                    } else {
2486                        break;
2487                    }
2488                }
2489
2490                Ok(SqlCommand::CopyFrom(CopyFromQuery {
2491                    table,
2492                    path,
2493                    format,
2494                    delimiter,
2495                    has_header,
2496                }))
2497            }
2498            other => Err(ParseError::expected(
2499                vec![
2500                    "SELECT",
2501                    "FROM",
2502                    "INSERT",
2503                    "UPDATE",
2504                    "DELETE",
2505                    "EXPLAIN",
2506                    "CREATE",
2507                    "DROP",
2508                    "ALTER",
2509                    "SET",
2510                    "SHOW",
2511                    "BEGIN",
2512                    "COMMIT",
2513                    "ROLLBACK",
2514                    "SAVEPOINT",
2515                    "RELEASE",
2516                    "START",
2517                    "VACUUM",
2518                    "ANALYZE",
2519                    "COPY",
2520                    "REFRESH",
2521                ],
2522                other,
2523                self.position(),
2524            )),
2525        }
2526    }
2527}