Skip to main content

reddb_server/storage/query/
sql.rs

1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3    AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery, AskQuery, BinOp,
4    CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
5    CreateForeignTableQuery, CreateIndexQuery, CreateMigrationQuery, CreatePolicyQuery,
6    CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery, CreateTableQuery,
7    CreateTimeSeriesQuery, CreateTreeQuery, CreateVectorQuery, CreateViewQuery, DeleteQuery,
8    DropCollectionQuery, DropDocumentQuery, DropForeignTableQuery, DropGraphQuery, DropIndexQuery,
9    DropKvQuery, DropPolicyQuery, DropQueueQuery, DropSchemaQuery, DropSequenceQuery,
10    DropServerQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery,
11    DropViewQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef,
12    Filter, ForeignColumnDef, GrantStmt, GraphCommand, GraphQuery, HybridQuery, InsertQuery,
13    JoinQuery, KvCommand, MaintenanceCommand, PathQuery, PolicyAction, ProbabilisticCommand,
14    QueryExpr, QueueCommand, QueueSelectQuery, RefreshMaterializedViewQuery, RevokeStmt,
15    RollbackMigrationQuery, SearchCommand, Span, TableQuery, TreeCommand, TruncateQuery,
16    TxnControl, UpdateQuery, VectorQuery,
17};
18use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
19use crate::storage::query::sql_lowering::filter_to_expr;
20use crate::storage::query::Token;
21use crate::storage::schema::Value;
22
23/// Canonical SQL frontend command surface.
24///
25/// This is the single entrypoint for SQL/RQL-style commands before they are
26/// lowered into the broader multi-backend `QueryExpr` space.
27#[derive(Debug, Clone)]
28pub enum SqlStatement {
29    Query(SqlQuery),
30    Mutation(SqlMutation),
31    Schema(SqlSchemaCommand),
32    Admin(SqlAdminCommand),
33}
34
35#[derive(Debug, Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum FrontendStatement {
38    Sql(SqlStatement),
39    Graph(GraphQuery),
40    GraphCommand(GraphCommand),
41    Path(PathQuery),
42    Vector(VectorQuery),
43    Hybrid(HybridQuery),
44    Search(SearchCommand),
45    Ask(AskQuery),
46    QueueSelect(QueueSelectQuery),
47    QueueCommand(QueueCommand),
48    EventsBackfill(EventsBackfillQuery),
49    EventsBackfillStatus { collection: String },
50    TreeCommand(TreeCommand),
51    ProbabilisticCommand(ProbabilisticCommand),
52    KvCommand(KvCommand),
53    ConfigCommand(ConfigCommand),
54}
55
56#[derive(Debug, Clone)]
57pub enum SqlCommand {
58    Select(TableQuery),
59    Join(JoinQuery),
60    Insert(InsertQuery),
61    Update(UpdateQuery),
62    Delete(DeleteQuery),
63    ExplainAlter(ExplainAlterQuery),
64    CreateTable(CreateTableQuery),
65    CreateCollection(CreateCollectionQuery),
66    CreateVector(CreateVectorQuery),
67    DropTable(DropTableQuery),
68    DropGraph(DropGraphQuery),
69    DropVector(DropVectorQuery),
70    DropDocument(DropDocumentQuery),
71    DropKv(DropKvQuery),
72    DropCollection(DropCollectionQuery),
73    Truncate(TruncateQuery),
74    AlterTable(AlterTableQuery),
75    CreateIndex(CreateIndexQuery),
76    DropIndex(DropIndexQuery),
77    CreateTimeSeries(CreateTimeSeriesQuery),
78    DropTimeSeries(DropTimeSeriesQuery),
79    CreateQueue(CreateQueueQuery),
80    AlterQueue(AlterQueueQuery),
81    DropQueue(DropQueueQuery),
82    CreateTree(CreateTreeQuery),
83    DropTree(DropTreeQuery),
84    Probabilistic(ProbabilisticCommand),
85    SetConfig {
86        key: String,
87        value: Value,
88    },
89    ShowConfig {
90        prefix: Option<String>,
91    },
92    SetSecret {
93        key: String,
94        value: Value,
95    },
96    DeleteSecret {
97        key: String,
98    },
99    ShowSecrets {
100        prefix: Option<String>,
101    },
102    SetTenant(Option<String>),
103    ShowTenant,
104    TransactionControl(TxnControl),
105    Maintenance(MaintenanceCommand),
106    CreateSchema(CreateSchemaQuery),
107    DropSchema(DropSchemaQuery),
108    CreateSequence(CreateSequenceQuery),
109    DropSequence(DropSequenceQuery),
110    CopyFrom(CopyFromQuery),
111    CreateView(CreateViewQuery),
112    DropView(DropViewQuery),
113    RefreshMaterializedView(RefreshMaterializedViewQuery),
114    CreatePolicy(CreatePolicyQuery),
115    DropPolicy(DropPolicyQuery),
116    CreateServer(CreateServerQuery),
117    DropServer(DropServerQuery),
118    CreateForeignTable(CreateForeignTableQuery),
119    DropForeignTable(DropForeignTableQuery),
120    /// `GRANT … ON … TO …`
121    Grant(GrantStmt),
122    /// `REVOKE … ON … FROM …`
123    Revoke(RevokeStmt),
124    /// `ALTER USER name <attrs>`
125    AlterUser(AlterUserStmt),
126    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
127    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
128    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
129    /// can route the multitude of shapes through a single arm.
130    IamPolicy(QueryExpr),
131    CreateMigration(CreateMigrationQuery),
132    ApplyMigration(ApplyMigrationQuery),
133    RollbackMigration(RollbackMigrationQuery),
134    ExplainMigration(ExplainMigrationQuery),
135}
136
137fn collection_model_filter(model: &str) -> Filter {
138    Filter::Compare {
139        field: FieldRef::column("", "model"),
140        op: CompareOp::Eq,
141        value: Value::Text(model.to_string().into()),
142    }
143}
144
145fn add_table_filter(query: &mut TableQuery, filter: Filter) {
146    let combined = match query.filter.take() {
147        Some(existing) => existing.and(filter),
148        None => filter,
149    };
150    query.where_expr = Some(filter_to_expr(&combined));
151    query.filter = Some(combined);
152}
153
154fn parse_show_collections_by_model(
155    parser: &mut Parser<'_>,
156    model: &str,
157) -> Result<TableQuery, ParseError> {
158    let mut query = TableQuery::new("red.collections");
159    parser.parse_table_clauses(&mut query)?;
160    add_table_filter(&mut query, collection_model_filter(model));
161    Ok(query)
162}
163
164#[derive(Debug, Clone)]
165#[allow(clippy::large_enum_variant)]
166pub enum SqlQuery {
167    Select(TableQuery),
168    Join(JoinQuery),
169}
170
171#[derive(Debug, Clone)]
172pub enum SqlMutation {
173    Insert(InsertQuery),
174    Update(UpdateQuery),
175    Delete(DeleteQuery),
176}
177
178#[derive(Debug, Clone)]
179pub enum SqlSchemaCommand {
180    ExplainAlter(ExplainAlterQuery),
181    CreateTable(CreateTableQuery),
182    CreateCollection(CreateCollectionQuery),
183    CreateVector(CreateVectorQuery),
184    DropTable(DropTableQuery),
185    DropGraph(DropGraphQuery),
186    DropVector(DropVectorQuery),
187    DropDocument(DropDocumentQuery),
188    DropKv(DropKvQuery),
189    DropCollection(DropCollectionQuery),
190    Truncate(TruncateQuery),
191    AlterTable(AlterTableQuery),
192    CreateIndex(CreateIndexQuery),
193    DropIndex(DropIndexQuery),
194    CreateTimeSeries(CreateTimeSeriesQuery),
195    DropTimeSeries(DropTimeSeriesQuery),
196    CreateQueue(CreateQueueQuery),
197    AlterQueue(AlterQueueQuery),
198    DropQueue(DropQueueQuery),
199    CreateTree(CreateTreeQuery),
200    DropTree(DropTreeQuery),
201    Probabilistic(ProbabilisticCommand),
202    CreateSchema(CreateSchemaQuery),
203    DropSchema(DropSchemaQuery),
204    CreateSequence(CreateSequenceQuery),
205    DropSequence(DropSequenceQuery),
206    CopyFrom(CopyFromQuery),
207    CreateView(CreateViewQuery),
208    DropView(DropViewQuery),
209    RefreshMaterializedView(RefreshMaterializedViewQuery),
210    CreatePolicy(CreatePolicyQuery),
211    DropPolicy(DropPolicyQuery),
212    CreateServer(CreateServerQuery),
213    DropServer(DropServerQuery),
214    CreateForeignTable(CreateForeignTableQuery),
215    DropForeignTable(DropForeignTableQuery),
216    CreateMigration(CreateMigrationQuery),
217    ApplyMigration(ApplyMigrationQuery),
218    RollbackMigration(RollbackMigrationQuery),
219    ExplainMigration(ExplainMigrationQuery),
220}
221
222#[derive(Debug, Clone)]
223#[allow(clippy::large_enum_variant)]
224pub enum SqlAdminCommand {
225    SetConfig { key: String, value: Value },
226    ShowConfig { prefix: Option<String> },
227    SetSecret { key: String, value: Value },
228    DeleteSecret { key: String },
229    ShowSecrets { prefix: Option<String> },
230    SetTenant(Option<String>),
231    ShowTenant,
232    TransactionControl(TxnControl),
233    Maintenance(MaintenanceCommand),
234    Grant(GrantStmt),
235    Revoke(RevokeStmt),
236    AlterUser(AlterUserStmt),
237    IamPolicy(QueryExpr),
238}
239
240impl SqlStatement {
241    pub fn into_command(self) -> SqlCommand {
242        match self {
243            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
244            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
245            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
246            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
247            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
248            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
249                SqlCommand::ExplainAlter(query)
250            }
251            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
252                SqlCommand::CreateTable(query)
253            }
254            SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
255                SqlCommand::CreateCollection(query)
256            }
257            SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
258                SqlCommand::CreateVector(query)
259            }
260            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
261                SqlCommand::DropTable(query)
262            }
263            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
264                SqlCommand::DropGraph(query)
265            }
266            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
267                SqlCommand::DropVector(query)
268            }
269            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
270                SqlCommand::DropDocument(query)
271            }
272            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
273            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
274                SqlCommand::DropCollection(query)
275            }
276            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
277            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
278                SqlCommand::AlterTable(query)
279            }
280            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
281                SqlCommand::CreateIndex(query)
282            }
283            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
284                SqlCommand::DropIndex(query)
285            }
286            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
287                SqlCommand::CreateTimeSeries(query)
288            }
289            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
290                SqlCommand::DropTimeSeries(query)
291            }
292            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
293                SqlCommand::CreateQueue(query)
294            }
295            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
296                SqlCommand::AlterQueue(query)
297            }
298            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
299                SqlCommand::DropQueue(query)
300            }
301            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
302                SqlCommand::CreateTree(query)
303            }
304            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
305            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
306                SqlCommand::Probabilistic(command)
307            }
308            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
309                SqlCommand::SetConfig { key, value }
310            }
311            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
312                SqlCommand::ShowConfig { prefix }
313            }
314            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
315                SqlCommand::SetSecret { key, value }
316            }
317            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
318                SqlCommand::DeleteSecret { key }
319            }
320            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
321                SqlCommand::ShowSecrets { prefix }
322            }
323            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
324            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
325            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
326                SqlCommand::TransactionControl(ctl)
327            }
328            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
329            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
330            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
331            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
332                SqlCommand::CreateSequence(q)
333            }
334            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
335            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
336            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
337            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
338            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
339                SqlCommand::RefreshMaterializedView(q)
340            }
341            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
342            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
343            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
344            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
345            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
346                SqlCommand::CreateForeignTable(q)
347            }
348            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
349                SqlCommand::DropForeignTable(q)
350            }
351            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
352            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
353            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
354            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
355            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
356                SqlCommand::CreateMigration(q)
357            }
358            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
359                SqlCommand::ApplyMigration(q)
360            }
361            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
362                SqlCommand::RollbackMigration(q)
363            }
364            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
365                SqlCommand::ExplainMigration(q)
366            }
367        }
368    }
369
370    pub fn into_query_expr(self) -> QueryExpr {
371        self.into_command().into_query_expr()
372    }
373}
374
375impl FrontendStatement {
376    pub fn into_query_expr(self) -> QueryExpr {
377        match self {
378            FrontendStatement::Sql(statement) => statement.into_query_expr(),
379            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
380            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
381            FrontendStatement::Path(query) => QueryExpr::Path(query),
382            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
383            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
384            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
385            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
386            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
387            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
388            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
389            FrontendStatement::EventsBackfillStatus { collection } => {
390                QueryExpr::EventsBackfillStatus { collection }
391            }
392            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
393            FrontendStatement::ProbabilisticCommand(command) => {
394                QueryExpr::ProbabilisticCommand(command)
395            }
396            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
397            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
398        }
399    }
400}
401
402pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
403    let mut parser = Parser::new(input)?;
404    let statement = parser.parse_frontend_statement()?;
405    if !parser.check(&Token::Eof) {
406        return Err(ParseError::new(
407            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
408            // Display arms emit raw user bytes. Render via `{:?}` so
409            // embedded CR/LF/NUL/quotes are escaped before the message
410            // reaches downstream JSON / audit / log / gRPC sinks.
411            format!("Unexpected token after query: {:?}", parser.current.token),
412            parser.position(),
413        ));
414    }
415    Ok(statement)
416}
417
418impl SqlCommand {
419    pub fn into_query_expr(self) -> QueryExpr {
420        match self {
421            SqlCommand::Select(query) => QueryExpr::Table(query),
422            SqlCommand::Join(query) => QueryExpr::Join(query),
423            SqlCommand::Insert(query) => QueryExpr::Insert(query),
424            SqlCommand::Update(query) => QueryExpr::Update(query),
425            SqlCommand::Delete(query) => QueryExpr::Delete(query),
426            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
427            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
428            SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
429            SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
430            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
431            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
432            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
433            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
434            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
435            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
436            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
437            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
438            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
439            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
440            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
441            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
442            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
443            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
444            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
445            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
446            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
447            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
448            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
449            SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
450            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
451            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
452            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
453            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
454            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
455            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
456            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
457            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
458            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
459            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
460            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
461            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
462            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
463            SqlCommand::DropView(q) => QueryExpr::DropView(q),
464            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
465            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
466            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
467            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
468            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
469            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
470            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
471            SqlCommand::Grant(s) => QueryExpr::Grant(s),
472            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
473            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
474            SqlCommand::IamPolicy(e) => e,
475            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
476            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
477            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
478            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
479        }
480    }
481
482    pub fn into_statement(self) -> SqlStatement {
483        match self {
484            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
485            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
486            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
487            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
488            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
489            SqlCommand::ExplainAlter(query) => {
490                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
491            }
492            SqlCommand::CreateTable(query) => {
493                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
494            }
495            SqlCommand::CreateCollection(query) => {
496                SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
497            }
498            SqlCommand::CreateVector(query) => {
499                SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
500            }
501            SqlCommand::DropTable(query) => {
502                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
503            }
504            SqlCommand::DropGraph(query) => {
505                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
506            }
507            SqlCommand::DropVector(query) => {
508                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
509            }
510            SqlCommand::DropDocument(query) => {
511                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
512            }
513            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
514            SqlCommand::DropCollection(query) => {
515                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
516            }
517            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
518            SqlCommand::AlterTable(query) => {
519                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
520            }
521            SqlCommand::CreateIndex(query) => {
522                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
523            }
524            SqlCommand::DropIndex(query) => {
525                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
526            }
527            SqlCommand::CreateTimeSeries(query) => {
528                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
529            }
530            SqlCommand::DropTimeSeries(query) => {
531                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
532            }
533            SqlCommand::CreateQueue(query) => {
534                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
535            }
536            SqlCommand::AlterQueue(query) => {
537                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
538            }
539            SqlCommand::DropQueue(query) => {
540                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
541            }
542            SqlCommand::CreateTree(query) => {
543                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
544            }
545            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
546            SqlCommand::Probabilistic(command) => {
547                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
548            }
549            SqlCommand::SetConfig { key, value } => {
550                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
551            }
552            SqlCommand::ShowConfig { prefix } => {
553                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
554            }
555            SqlCommand::SetSecret { key, value } => {
556                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
557            }
558            SqlCommand::DeleteSecret { key } => {
559                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
560            }
561            SqlCommand::ShowSecrets { prefix } => {
562                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
563            }
564            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
565            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
566            SqlCommand::TransactionControl(ctl) => {
567                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
568            }
569            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
570            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
571            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
572            SqlCommand::CreateSequence(q) => {
573                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
574            }
575            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
576            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
577            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
578            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
579            SqlCommand::RefreshMaterializedView(q) => {
580                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
581            }
582            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
583            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
584            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
585            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
586            SqlCommand::CreateForeignTable(q) => {
587                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
588            }
589            SqlCommand::DropForeignTable(q) => {
590                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
591            }
592            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
593            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
594            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
595            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
596            SqlCommand::CreateMigration(q) => {
597                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
598            }
599            SqlCommand::ApplyMigration(q) => {
600                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
601            }
602            SqlCommand::RollbackMigration(q) => {
603                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
604            }
605            SqlCommand::ExplainMigration(q) => {
606                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
607            }
608        }
609    }
610}
611
612impl<'a> Parser<'a> {
613    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
614        self.expect_ident()?; // EVENTS
615        if self.consume_ident_ci("STATUS")? {
616            let mut query = TableQuery::new("red.subscriptions");
617            let collection = match self.peek().clone() {
618                Token::Ident(name) => {
619                    self.advance()?;
620                    Some(name)
621                }
622                Token::String(name) => {
623                    self.advance()?;
624                    Some(name)
625                }
626                _ => None,
627            };
628            self.parse_table_clauses(&mut query)?;
629            if let Some(collection) = collection {
630                let filter = Filter::compare(
631                    FieldRef::column("red.subscriptions", "collection"),
632                    CompareOp::Eq,
633                    Value::text(collection),
634                );
635                let expr = filter_to_expr(&filter);
636                query.where_expr = Some(match query.where_expr.take() {
637                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
638                    None => expr,
639                });
640                query.filter = Some(match query.filter.take() {
641                    Some(existing) => existing.and(filter),
642                    None => filter,
643                });
644            }
645            return Ok(QueryExpr::Table(query));
646        }
647
648        if !self.consume_ident_ci("BACKFILL")? {
649            return Err(ParseError::expected(
650                vec!["BACKFILL", "STATUS"],
651                self.peek(),
652                self.position(),
653            ));
654        }
655
656        if self.consume_ident_ci("STATUS")? {
657            let collection = self.expect_ident()?;
658            return Ok(QueryExpr::EventsBackfillStatus { collection });
659        }
660
661        let collection = self.expect_ident()?;
662        let where_filter = if self.consume(&Token::Where)? {
663            let mut parts = Vec::new();
664            while !self.check(&Token::Eof) && !self.check(&Token::To) {
665                parts.push(self.peek().to_string());
666                self.advance()?;
667            }
668            if parts.is_empty() {
669                return Err(ParseError::expected(
670                    vec!["predicate"],
671                    self.peek(),
672                    self.position(),
673                ));
674            }
675            Some(parts.join(" "))
676        } else {
677            None
678        };
679
680        self.expect(Token::To)?;
681        let target_queue = self.expect_ident()?;
682        let limit = if self.consume(&Token::Limit)? {
683            Some(self.parse_positive_integer("LIMIT")? as u64)
684        } else {
685            None
686        };
687
688        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
689            collection,
690            where_filter,
691            target_queue,
692            limit,
693        }))
694    }
695
696    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
697    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
698    /// clause is absent. Values are always single-quoted string literals —
699    /// consistent with PG's generic-options model.
700    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
701        if !self.consume(&Token::Options)? {
702            return Ok(Vec::new());
703        }
704        self.expect(Token::LParen)?;
705        let mut out: Vec<(String, String)> = Vec::new();
706        loop {
707            // Option keys frequently collide with reserved words
708            // (`path`, `format`, `delimiter`, `header`, …) — accept
709            // the keyword form and lowercase it so downstream
710            // option-name matching stays case-insensitive.
711            let was_ident = matches!(self.peek(), Token::Ident(_));
712            let raw = self.expect_ident_or_keyword()?;
713            let key = if was_ident {
714                raw
715            } else {
716                raw.to_ascii_lowercase()
717            };
718            // Value is a single-quoted string literal.
719            let value = self.parse_string()?;
720            out.push((key, value));
721            if !self.consume(&Token::Comma)? {
722                break;
723            }
724        }
725        self.expect(Token::RParen)?;
726        Ok(out)
727    }
728
729    /// Parse any top-level frontend statement through a single shared surface.
730    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
731        match self.peek() {
732            Token::Select => match self.parse_select_query()? {
733                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
734                    SqlQuery::Select(query),
735                ))),
736                QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
737                    SqlQuery::Join(query),
738                ))),
739                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
740                other => Err(ParseError::new(
741                    format!("internal: SELECT produced unexpected query kind {other:?}"),
742                    self.position(),
743                )),
744            },
745            Token::From
746            | Token::Insert
747            | Token::Update
748            | Token::Truncate
749            | Token::Create
750            | Token::Drop
751            | Token::Alter
752            | Token::Set
753            | Token::Begin
754            | Token::Commit
755            | Token::Rollback
756            | Token::Savepoint
757            | Token::Release
758            | Token::Start
759            | Token::Vacuum
760            | Token::Analyze
761            | Token::Copy
762            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
763            Token::Explain => {
764                if matches!(
765                    self.peek_next()?,
766                    Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
767                ) {
768                    match self.parse_explain_ask_query()? {
769                        QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
770                        other => Err(ParseError::new(
771                            format!(
772                                "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
773                            ),
774                            self.position(),
775                        )),
776                    }
777                } else {
778                    self.parse_sql_statement().map(FrontendStatement::Sql)
779                }
780            }
781            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
782                self.parse_sql_statement().map(FrontendStatement::Sql)
783            }
784            Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
785            Token::Ident(name)
786                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
787            {
788                self.parse_sql_statement().map(FrontendStatement::Sql)
789            }
790            Token::Ident(name)
791                if name.eq_ignore_ascii_case("GRANT")
792                    || name.eq_ignore_ascii_case("REVOKE")
793                    || name.eq_ignore_ascii_case("SIMULATE")
794                    || name.eq_ignore_ascii_case("APPLY") =>
795            {
796                self.parse_sql_statement().map(FrontendStatement::Sql)
797            }
798            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
799                self.advance()?;
800                if matches!(
801                    self.peek(),
802                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
803                ) {
804                    match self.parse_config_watch_after_watch()? {
805                        QueryExpr::ConfigCommand(command) => {
806                            Ok(FrontendStatement::ConfigCommand(command))
807                        }
808                        other => Err(ParseError::new(
809                            format!(
810                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
811                            ),
812                            self.position(),
813                        )),
814                    }
815                } else if matches!(
816                    self.peek(),
817                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
818                ) {
819                    match self.parse_vault_watch_after_watch()? {
820                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
821                        other => Err(ParseError::new(
822                            format!(
823                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
824                            ),
825                            self.position(),
826                        )),
827                    }
828                } else {
829                    match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
830                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
831                        other => Err(ParseError::new(
832                            format!("internal: WATCH produced unexpected query kind {other:?}"),
833                            self.position(),
834                        )),
835                    }
836                }
837            }
838            Token::List => {
839                self.advance()?;
840                if matches!(
841                    self.peek(),
842                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
843                ) {
844                    match self.parse_config_list_after_list()? {
845                        QueryExpr::ConfigCommand(command) => {
846                            Ok(FrontendStatement::ConfigCommand(command))
847                        }
848                        other => Err(ParseError::new(
849                            format!(
850                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
851                            ),
852                            self.position(),
853                        )),
854                    }
855                } else if matches!(
856                    self.peek(),
857                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
858                ) {
859                    match self.parse_vault_list_after_list()? {
860                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
861                        other => Err(ParseError::new(
862                            format!(
863                                "internal: LIST VAULT produced unexpected query kind {other:?}"
864                            ),
865                            self.position(),
866                        )),
867                    }
868                } else {
869                    Err(ParseError::expected(
870                        vec!["CONFIG", "VAULT"],
871                        self.peek(),
872                        self.position(),
873                    ))
874                }
875            }
876            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
877                self.advance()?;
878                if matches!(
879                    self.peek(),
880                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
881                ) {
882                    match self.parse_config_list_after_list()? {
883                        QueryExpr::ConfigCommand(command) => {
884                            Ok(FrontendStatement::ConfigCommand(command))
885                        }
886                        other => Err(ParseError::new(
887                            format!(
888                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
889                            ),
890                            self.position(),
891                        )),
892                    }
893                } else if matches!(
894                    self.peek(),
895                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
896                ) {
897                    match self.parse_vault_list_after_list()? {
898                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
899                        other => Err(ParseError::new(
900                            format!(
901                                "internal: LIST VAULT produced unexpected query kind {other:?}"
902                            ),
903                            self.position(),
904                        )),
905                    }
906                } else {
907                    Err(ParseError::expected(
908                        vec!["CONFIG", "VAULT"],
909                        self.peek(),
910                        self.position(),
911                    ))
912                }
913            }
914            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
915                if matches!(
916                    self.peek_next()?,
917                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
918                ) {
919                    match self.parse_config_command()? {
920                        QueryExpr::ConfigCommand(command) => {
921                            Ok(FrontendStatement::ConfigCommand(command))
922                        }
923                        other => Err(ParseError::new(
924                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
925                            self.position(),
926                        )),
927                    }
928                } else {
929                    self.advance()?;
930                    match self.parse_kv_invalidate_tags_after_invalidate()? {
931                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
932                        other => Err(ParseError::new(
933                            format!(
934                                "internal: INVALIDATE produced unexpected query kind {other:?}"
935                            ),
936                            self.position(),
937                        )),
938                    }
939                }
940            }
941            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
942            Token::Match => match self.parse_match_query()? {
943                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
944                other => Err(ParseError::new(
945                    format!("internal: MATCH produced unexpected query kind {other:?}"),
946                    self.position(),
947                )),
948            },
949            Token::Path => match self.parse_path_query()? {
950                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
951                other => Err(ParseError::new(
952                    format!("internal: PATH produced unexpected query kind {other:?}"),
953                    self.position(),
954                )),
955            },
956            Token::Vector => match self.parse_vector_query()? {
957                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
958                other => Err(ParseError::new(
959                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
960                    self.position(),
961                )),
962            },
963            Token::Hybrid => match self.parse_hybrid_query()? {
964                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
965                other => Err(ParseError::new(
966                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
967                    self.position(),
968                )),
969            },
970            Token::Graph => match self.parse_graph_command()? {
971                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
972                other => Err(ParseError::new(
973                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
974                    self.position(),
975                )),
976            },
977            Token::Search => match self.parse_search_command()? {
978                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
979                other => Err(ParseError::new(
980                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
981                    self.position(),
982                )),
983            },
984            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
985                match self.parse_ask_query()? {
986                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
987                    other => Err(ParseError::new(
988                        format!("internal: ASK produced unexpected query kind {other:?}"),
989                        self.position(),
990                    )),
991                }
992            }
993            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
994                match self.parse_unseal_vault_command()? {
995                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
996                    other => Err(ParseError::new(
997                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
998                        self.position(),
999                    )),
1000                }
1001            }
1002            Token::Queue => match self.parse_queue_command()? {
1003                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
1004                other => Err(ParseError::new(
1005                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
1006                    self.position(),
1007                )),
1008            },
1009            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1010                match self.parse_events_command()? {
1011                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1012                        SqlQuery::Select(query),
1013                    ))),
1014                    QueryExpr::EventsBackfill(query) => {
1015                        Ok(FrontendStatement::EventsBackfill(query))
1016                    }
1017                    QueryExpr::EventsBackfillStatus { collection } => {
1018                        Ok(FrontendStatement::EventsBackfillStatus { collection })
1019                    }
1020                    other => Err(ParseError::new(
1021                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
1022                        self.position(),
1023                    )),
1024                }
1025            }
1026            Token::Kv => match self.parse_kv_command()? {
1027                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1028                other => Err(ParseError::new(
1029                    format!("internal: KV produced unexpected query kind {other:?}"),
1030                    self.position(),
1031                )),
1032            },
1033            Token::Delete => {
1034                if matches!(
1035                    self.peek_next()?,
1036                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1037                ) {
1038                    match self.parse_config_command()? {
1039                        QueryExpr::ConfigCommand(command) => {
1040                            Ok(FrontendStatement::ConfigCommand(command))
1041                        }
1042                        other => Err(ParseError::new(
1043                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1044                            self.position(),
1045                        )),
1046                    }
1047                } else if matches!(
1048                    self.peek_next()?,
1049                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1050                ) {
1051                    match self.parse_vault_lifecycle_command()? {
1052                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1053                        other => Err(ParseError::new(
1054                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1055                            self.position(),
1056                        )),
1057                    }
1058                } else {
1059                    self.parse_sql_statement().map(FrontendStatement::Sql)
1060                }
1061            }
1062            Token::Add => match self.parse_config_command()? {
1063                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1064                other => Err(ParseError::new(
1065                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
1066                    self.position(),
1067                )),
1068            },
1069            Token::Purge => match self.parse_vault_lifecycle_command()? {
1070                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1071                other => Err(ParseError::new(
1072                    format!("internal: VAULT produced unexpected query kind {other:?}"),
1073                    self.position(),
1074                )),
1075            },
1076            Token::Ident(name)
1077                if name.eq_ignore_ascii_case("PUT")
1078                    || name.eq_ignore_ascii_case("GET")
1079                    || name.eq_ignore_ascii_case("RESOLVE")
1080                    || name.eq_ignore_ascii_case("ROTATE")
1081                    || name.eq_ignore_ascii_case("HISTORY")
1082                    || name.eq_ignore_ascii_case("PURGE")
1083                    || name.eq_ignore_ascii_case("INCR")
1084                    || name.eq_ignore_ascii_case("DECR")
1085                    || name.eq_ignore_ascii_case("INVALIDATE") =>
1086            {
1087                if matches!(
1088                    self.peek_next()?,
1089                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1090                ) {
1091                    match self.parse_vault_lifecycle_command()? {
1092                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1093                        other => Err(ParseError::new(
1094                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1095                            self.position(),
1096                        )),
1097                    }
1098                } else {
1099                    match self.parse_config_command()? {
1100                        QueryExpr::ConfigCommand(command) => {
1101                            Ok(FrontendStatement::ConfigCommand(command))
1102                        }
1103                        other => Err(ParseError::new(
1104                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1105                            self.position(),
1106                        )),
1107                    }
1108                }
1109            }
1110            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1111                match self.parse_vault_command()? {
1112                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1113                    other => Err(ParseError::new(
1114                        format!("internal: VAULT produced unexpected query kind {other:?}"),
1115                        self.position(),
1116                    )),
1117                }
1118            }
1119            Token::Tree => match self.parse_tree_command()? {
1120                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1121                other => Err(ParseError::new(
1122                    format!("internal: TREE produced unexpected query kind {other:?}"),
1123                    self.position(),
1124                )),
1125            },
1126            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1127                match self.parse_hll_command()? {
1128                    QueryExpr::ProbabilisticCommand(command) => {
1129                        Ok(FrontendStatement::ProbabilisticCommand(command))
1130                    }
1131                    other => Err(ParseError::new(
1132                        format!("internal: HLL produced unexpected query kind {other:?}"),
1133                        self.position(),
1134                    )),
1135                }
1136            }
1137            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1138                match self.parse_sketch_command()? {
1139                    QueryExpr::ProbabilisticCommand(command) => {
1140                        Ok(FrontendStatement::ProbabilisticCommand(command))
1141                    }
1142                    other => Err(ParseError::new(
1143                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
1144                        self.position(),
1145                    )),
1146                }
1147            }
1148            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1149                match self.parse_filter_command()? {
1150                    QueryExpr::ProbabilisticCommand(command) => {
1151                        Ok(FrontendStatement::ProbabilisticCommand(command))
1152                    }
1153                    other => Err(ParseError::new(
1154                        format!("internal: FILTER produced unexpected query kind {other:?}"),
1155                        self.position(),
1156                    )),
1157                }
1158            }
1159            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1160                .parse_sql_command()
1161                .map(SqlCommand::into_statement)
1162                .map(FrontendStatement::Sql),
1163            other => Err(ParseError::expected(
1164                vec![
1165                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1166                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1167                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1168                    "DESCRIBE", "DESC",
1169                ],
1170                other,
1171                self.position(),
1172            )),
1173        }
1174    }
1175
1176    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
1177    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1178        self.parse_sql_command().map(SqlCommand::into_statement)
1179    }
1180
1181    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1182        let mut path = self.expect_ident()?;
1183        while self.consume(&Token::Dot)? {
1184            let next = self.expect_ident_or_keyword()?;
1185            path = format!("{path}.{next}");
1186        }
1187        Ok(if lowercase {
1188            path.to_ascii_lowercase()
1189        } else {
1190            path
1191        })
1192    }
1193
1194    /// Parse any SQL/RQL-style command through a single frontend module.
1195    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1196        match self.peek() {
1197            Token::Select => match self.parse_select_query()? {
1198                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1199                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1200                other => Err(ParseError::new(
1201                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1202                    self.position(),
1203                )),
1204            },
1205            Token::From => match self.parse_from_query()? {
1206                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1207                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1208                other => Err(ParseError::new(
1209                    format!("internal: FROM produced unexpected query kind {other:?}"),
1210                    self.position(),
1211                )),
1212            },
1213            Token::Insert => match self.parse_insert_query()? {
1214                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1215                other => Err(ParseError::new(
1216                    format!("internal: INSERT produced unexpected query kind {other:?}"),
1217                    self.position(),
1218                )),
1219            },
1220            Token::Update => match self.parse_update_query()? {
1221                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1222                other => Err(ParseError::new(
1223                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
1224                    self.position(),
1225                )),
1226            },
1227            Token::Delete => {
1228                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1229                {
1230                    self.advance()?; // DELETE
1231                    self.advance()?; // SECRET
1232                    let key = self.parse_dotted_admin_path(true)?;
1233                    Ok(SqlCommand::DeleteSecret { key })
1234                } else {
1235                    match self.parse_delete_query()? {
1236                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1237                        other => Err(ParseError::new(
1238                            format!("internal: DELETE produced unexpected query kind {other:?}"),
1239                            self.position(),
1240                        )),
1241                    }
1242                }
1243            }
1244            Token::Truncate => {
1245                self.advance()?;
1246                let model = if self.consume(&Token::Table)? {
1247                    Some(CollectionModel::Table)
1248                } else if self.consume(&Token::Graph)? {
1249                    Some(CollectionModel::Graph)
1250                } else if self.consume(&Token::Vector)? {
1251                    Some(CollectionModel::Vector)
1252                } else if self.consume(&Token::Document)? {
1253                    Some(CollectionModel::Document)
1254                } else if self.consume(&Token::Timeseries)? {
1255                    Some(CollectionModel::TimeSeries)
1256                } else if self.consume_ident_ci("METRICS")? {
1257                    Some(CollectionModel::Metrics)
1258                } else if self.consume(&Token::Kv)? {
1259                    Some(CollectionModel::Kv)
1260                } else if self.consume(&Token::Queue)? {
1261                    Some(CollectionModel::Queue)
1262                } else if self.consume(&Token::Collection)? {
1263                    None
1264                } else {
1265                    return Err(ParseError::expected(
1266                        vec![
1267                            "TABLE",
1268                            "GRAPH",
1269                            "VECTOR",
1270                            "DOCUMENT",
1271                            "TIMESERIES",
1272                            "METRICS",
1273                            "KV",
1274                            "QUEUE",
1275                            "COLLECTION",
1276                        ],
1277                        self.peek(),
1278                        self.position(),
1279                    ));
1280                };
1281                match self.parse_truncate_body(model)? {
1282                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1283                    other => Err(ParseError::new(
1284                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1285                        self.position(),
1286                    )),
1287                }
1288            }
1289            Token::Explain => {
1290                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
1291                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
1292                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1293                {
1294                    self.advance()?; // consume EXPLAIN
1295                    match self.parse_explain_migration_after_keyword()? {
1296                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
1297                        other => Err(ParseError::new(
1298                            format!(
1299                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
1300                            ),
1301                            self.position(),
1302                        )),
1303                    }
1304                } else {
1305                    match self.parse_explain_alter_query()? {
1306                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
1307                        other => Err(ParseError::new(
1308                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
1309                            self.position(),
1310                        )),
1311                    }
1312                }
1313            }
1314            Token::Create => {
1315                let pos = self.position();
1316                self.advance()?;
1317
1318                // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
1319                // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
1320                // don't collide with other CREATE variants (TABLE, INDEX, etc.).
1321                let mut or_replace = false;
1322                if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1323                    let _ = self.consume_ident_ci("REPLACE")?;
1324                    or_replace = true;
1325                }
1326                let materialized = self.consume(&Token::Materialized)?;
1327                if self.check(&Token::View) {
1328                    self.advance()?;
1329                    let if_not_exists = self.match_if_not_exists()?;
1330                    let name = self.expect_ident()?;
1331                    // Accept `AS` — the lexer promotes it to `Token::As`
1332                    // (keyword) but some paths still see it as an ident.
1333                    if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1334                        return Err(ParseError::expected(
1335                            vec!["AS"],
1336                            self.peek(),
1337                            self.position(),
1338                        ));
1339                    }
1340                    // Recursive parse of the body. Any QueryExpr that the
1341                    // rest of the grammar accepts is valid (Select, Join, etc.).
1342                    let body = self.parse_sql_command()?.into_query_expr();
1343                    return Ok(SqlCommand::CreateView(CreateViewQuery {
1344                        name,
1345                        query: Box::new(body),
1346                        materialized,
1347                        if_not_exists,
1348                        or_replace,
1349                    }));
1350                }
1351                // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
1352                // bail out — no other CREATE form accepts those modifiers.
1353                if or_replace || materialized {
1354                    return Err(ParseError::expected(
1355                        vec!["VIEW"],
1356                        self.peek(),
1357                        self.position(),
1358                    ));
1359                }
1360
1361                if self.check(&Token::Index) || self.check(&Token::Unique) {
1362                    match self.parse_create_index_query()? {
1363                        QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1364                        other => Err(ParseError::new(
1365                            format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1366                            self.position(),
1367                        )),
1368                    }
1369                } else if self.check(&Token::Table) {
1370                    self.expect(Token::Table)?;
1371                    match self.parse_create_table_body()? {
1372                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1373                        other => Err(ParseError::new(
1374                            format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1375                            self.position(),
1376                        )),
1377                    }
1378                } else if self.check(&Token::Graph) {
1379                    self.advance()?;
1380                    match self.parse_create_collection_model_body(CollectionModel::Graph)? {
1381                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1382                        other => Err(ParseError::new(
1383                            format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
1384                            self.position(),
1385                        )),
1386                    }
1387                } else if self.check(&Token::Document) {
1388                    self.advance()?;
1389                    match self.parse_create_collection_model_body(CollectionModel::Document)? {
1390                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1391                        other => Err(ParseError::new(
1392                            format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
1393                            self.position(),
1394                        )),
1395                    }
1396                } else if self.check(&Token::Vector) {
1397                    self.advance()?;
1398                    match self.parse_create_vector_body()? {
1399                        QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
1400                        other => Err(ParseError::new(
1401                            format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
1402                            self.position(),
1403                        )),
1404                    }
1405                } else if self.check(&Token::Collection) {
1406                    self.advance()?;
1407                    match self.parse_create_collection_body()? {
1408                        QueryExpr::CreateCollection(query) => {
1409                            Ok(SqlCommand::CreateCollection(query))
1410                        }
1411                        other => Err(ParseError::new(
1412                            format!(
1413                                "internal: CREATE COLLECTION produced unexpected kind {other:?}"
1414                            ),
1415                            self.position(),
1416                        )),
1417                    }
1418                } else if self.check(&Token::Kv) {
1419                    self.advance()?;
1420                    match self.parse_create_keyed_body(CollectionModel::Kv)? {
1421                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1422                        other => Err(ParseError::new(
1423                            format!("internal: CREATE KV produced unexpected kind {other:?}"),
1424                            self.position(),
1425                        )),
1426                    }
1427                } else if self.consume_ident_ci("CONFIG")? {
1428                    match self.parse_create_keyed_body(CollectionModel::Config)? {
1429                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1430                        other => Err(ParseError::new(
1431                            format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1432                            self.position(),
1433                        )),
1434                    }
1435                } else if self.consume_ident_ci("VAULT")? {
1436                    match self.parse_create_keyed_body(CollectionModel::Vault)? {
1437                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1438                        other => Err(ParseError::new(
1439                            format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1440                            self.position(),
1441                        )),
1442                    }
1443                } else if self.check(&Token::Timeseries) {
1444                    self.advance()?;
1445                    match self.parse_create_timeseries_body()? {
1446                        QueryExpr::CreateTimeSeries(query) => {
1447                            Ok(SqlCommand::CreateTimeSeries(query))
1448                        }
1449                        other => Err(ParseError::new(
1450                            format!(
1451                                "internal: CREATE TIMESERIES produced unexpected kind {other:?}"
1452                            ),
1453                            self.position(),
1454                        )),
1455                    }
1456                } else if self.consume_ident_ci("METRICS")? {
1457                    match self.parse_create_metrics_body()? {
1458                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1459                        other => Err(ParseError::new(
1460                            format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
1461                            self.position(),
1462                        )),
1463                    }
1464                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1465                {
1466                    self.advance()?;
1467                    match self.parse_create_hypertable_body()? {
1468                        QueryExpr::CreateTimeSeries(query) => {
1469                            Ok(SqlCommand::CreateTimeSeries(query))
1470                        }
1471                        other => Err(ParseError::new(
1472                            format!(
1473                                "internal: CREATE HYPERTABLE produced unexpected kind {other:?}"
1474                            ),
1475                            self.position(),
1476                        )),
1477                    }
1478                } else if self.check(&Token::Queue) {
1479                    self.advance()?;
1480                    match self.parse_create_queue_body()? {
1481                        QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1482                        other => Err(ParseError::new(
1483                            format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1484                            self.position(),
1485                        )),
1486                    }
1487                } else if self.check(&Token::Tree) {
1488                    self.advance()?;
1489                    match self.parse_create_tree_body()? {
1490                        QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1491                        other => Err(ParseError::new(
1492                            format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1493                            self.position(),
1494                        )),
1495                    }
1496                } else if matches!(self.peek(), Token::Ident(n) if
1497                    n.eq_ignore_ascii_case("HLL") ||
1498                    n.eq_ignore_ascii_case("SKETCH") ||
1499                    n.eq_ignore_ascii_case("FILTER"))
1500                {
1501                    match self.parse_create_probabilistic()? {
1502                        QueryExpr::ProbabilisticCommand(command) => {
1503                            Ok(SqlCommand::Probabilistic(command))
1504                        }
1505                        other => Err(ParseError::new(
1506                            format!(
1507                                "internal: CREATE probabilistic produced unexpected kind {other:?}"
1508                            ),
1509                            self.position(),
1510                        )),
1511                    }
1512                } else if self.check(&Token::Schema) {
1513                    // CREATE SCHEMA [IF NOT EXISTS] name
1514                    self.advance()?;
1515                    let if_not_exists = self.match_if_not_exists()?;
1516                    let name = self.expect_ident()?;
1517                    Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1518                        name,
1519                        if_not_exists,
1520                    }))
1521                } else if self.check(&Token::Policy) {
1522                    // Two forms share the leading `CREATE POLICY` tokens:
1523                    //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
1524                    //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
1525                    // Disambiguate by peeking the token after POLICY.
1526                    self.advance()?;
1527                    if matches!(self.peek(), Token::String(_)) {
1528                        // IAM form — short-circuit out of the SQL command stack.
1529                        let expr = self.parse_create_iam_policy_after_keywords()?;
1530                        // Inline command-wrapping: produce a synthetic SqlCommand by
1531                        // routing through a generic IAM admin holder. We don't
1532                        // have a dedicated SqlCommand variant for IAM yet, so we
1533                        // bounce through the existing Grant-shaped Admin slot
1534                        // which expects no further tokens.
1535                        return Ok(SqlCommand::IamPolicy(expr));
1536                    }
1537                    let name = self.expect_ident()?;
1538                    self.expect(Token::On)?;
1539
1540                    let (target_kind, table) = {
1541                        use crate::storage::query::ast::PolicyTargetKind;
1542                        let kw = match self.peek() {
1543                            Token::Ident(s) => Some(s.to_ascii_uppercase()),
1544                            _ => None,
1545                        };
1546                        let kind = kw.as_deref().and_then(|k| match k {
1547                            "NODES" => Some(PolicyTargetKind::Nodes),
1548                            "EDGES" => Some(PolicyTargetKind::Edges),
1549                            "VECTORS" => Some(PolicyTargetKind::Vectors),
1550                            "MESSAGES" => Some(PolicyTargetKind::Messages),
1551                            "POINTS" => Some(PolicyTargetKind::Points),
1552                            "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1553                            _ => None,
1554                        });
1555                        if let Some(k) = kind {
1556                            self.advance()?;
1557                            self.expect(Token::Of)?;
1558                            let coll = self.expect_ident()?;
1559                            (k, coll)
1560                        } else {
1561                            let coll = self.expect_ident()?;
1562                            (PolicyTargetKind::Table, coll)
1563                        }
1564                    };
1565
1566                    let action = if self.consume(&Token::For)? {
1567                        let a = match self.peek() {
1568                            Token::Select => {
1569                                self.advance()?;
1570                                Some(PolicyAction::Select)
1571                            }
1572                            Token::Insert => {
1573                                self.advance()?;
1574                                Some(PolicyAction::Insert)
1575                            }
1576                            Token::Update => {
1577                                self.advance()?;
1578                                Some(PolicyAction::Update)
1579                            }
1580                            Token::Delete => {
1581                                self.advance()?;
1582                                Some(PolicyAction::Delete)
1583                            }
1584                            Token::All => {
1585                                self.advance()?;
1586                                None
1587                            }
1588                            _ => None,
1589                        };
1590                        a
1591                    } else {
1592                        None
1593                    };
1594
1595                    let role = if self.consume(&Token::To)? {
1596                        Some(self.expect_ident()?)
1597                    } else {
1598                        None
1599                    };
1600
1601                    self.expect(Token::Using)?;
1602                    self.expect(Token::LParen)?;
1603                    let filter = self.parse_filter()?;
1604                    self.expect(Token::RParen)?;
1605
1606                    Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1607                        name,
1608                        table,
1609                        action,
1610                        role,
1611                        using: Box::new(filter),
1612                        target_kind,
1613                    }))
1614                } else if self.check(&Token::Server) {
1615                    // CREATE SERVER [IF NOT EXISTS] name
1616                    //   FOREIGN DATA WRAPPER kind
1617                    //   [OPTIONS (key 'value', ...)]
1618                    self.advance()?;
1619                    let if_not_exists = self.match_if_not_exists()?;
1620                    let name = self.expect_ident()?;
1621                    self.expect(Token::Foreign)?;
1622                    self.expect(Token::Data)?;
1623                    self.expect(Token::Wrapper)?;
1624                    let wrapper = self.expect_ident()?;
1625                    let options = self.parse_fdw_options_clause()?;
1626                    Ok(SqlCommand::CreateServer(CreateServerQuery {
1627                        name,
1628                        wrapper,
1629                        options,
1630                        if_not_exists,
1631                    }))
1632                } else if self.check(&Token::Foreign) {
1633                    // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
1634                    //   SERVER server_name
1635                    //   [OPTIONS (key 'value', ...)]
1636                    self.advance()?;
1637                    self.expect(Token::Table)?;
1638                    let if_not_exists = self.match_if_not_exists()?;
1639                    let name = self.expect_ident()?;
1640                    self.expect(Token::LParen)?;
1641                    let mut columns = Vec::new();
1642                    loop {
1643                        let col_name = self.expect_ident()?;
1644                        let data_type = self.expect_ident_or_keyword()?;
1645                        // Inline NOT NULL check — the CREATE TABLE path's helper is
1646                        // private and coupling to it just for FDW columns isn't worth it.
1647                        let mut not_null = false;
1648                        if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1649                            self.advance()?;
1650                            if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL"))
1651                            {
1652                                self.advance()?;
1653                                not_null = true;
1654                            }
1655                        }
1656                        columns.push(ForeignColumnDef {
1657                            name: col_name,
1658                            data_type,
1659                            not_null,
1660                        });
1661                        if !self.consume(&Token::Comma)? {
1662                            break;
1663                        }
1664                    }
1665                    self.expect(Token::RParen)?;
1666                    self.expect(Token::Server)?;
1667                    let server = self.expect_ident()?;
1668                    let options = self.parse_fdw_options_clause()?;
1669                    Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1670                        name,
1671                        server,
1672                        columns,
1673                        options,
1674                        if_not_exists,
1675                    }))
1676                } else if self.check(&Token::Sequence) {
1677                    // CREATE SEQUENCE [IF NOT EXISTS] name
1678                    //   [START [WITH] n] [INCREMENT [BY] n]
1679                    self.advance()?;
1680                    let if_not_exists = self.match_if_not_exists()?;
1681                    let name = self.expect_ident()?;
1682                    let mut start: i64 = 1;
1683                    let mut increment: i64 = 1;
1684                    // Loop over optional clauses in any order.
1685                    loop {
1686                        if self.consume(&Token::Start)? {
1687                            // Accept `START 100` or `START WITH 100`.
1688                            let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1689                            start = self.parse_integer()?;
1690                        } else if self.consume(&Token::Increment)? {
1691                            // Accept `INCREMENT 5` or `INCREMENT BY 5`.
1692                            let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1693                            increment = self.parse_integer()?;
1694                        } else {
1695                            break;
1696                        }
1697                    }
1698                    Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1699                        name,
1700                        if_not_exists,
1701                        start,
1702                        increment,
1703                    }))
1704                } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1705                {
1706                    self.advance()?; // consume MIGRATION
1707                    match self.parse_create_migration_body()? {
1708                        QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1709                        other => Err(ParseError::new(
1710                            format!(
1711                                "internal: CREATE MIGRATION produced unexpected kind {other:?}"
1712                            ),
1713                            self.position(),
1714                        )),
1715                    }
1716                } else if let Some(err) =
1717                    ParseError::unsupported_recognized_token(self.peek(), self.position())
1718                {
1719                    Err(err)
1720                } else {
1721                    Err(ParseError::expected(
1722                        vec![
1723                            "TABLE",
1724                            "GRAPH",
1725                            "VECTOR",
1726                            "DOCUMENT",
1727                            "KV",
1728                            "COLLECTION",
1729                            "INDEX",
1730                            "UNIQUE",
1731                            "TIMESERIES",
1732                            "QUEUE",
1733                            "TREE",
1734                            "HLL",
1735                            "SKETCH",
1736                            "FILTER",
1737                            "SCHEMA",
1738                            "SEQUENCE",
1739                            "MIGRATION",
1740                        ],
1741                        self.peek(),
1742                        pos,
1743                    ))
1744                }
1745            }
1746            Token::Drop => {
1747                let pos = self.position();
1748                self.advance()?;
1749
1750                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
1751                let materialized = self.consume(&Token::Materialized)?;
1752                if self.check(&Token::View) {
1753                    self.advance()?;
1754                    let if_exists = self.match_if_exists()?;
1755                    let name = self.expect_ident()?;
1756                    return Ok(SqlCommand::DropView(DropViewQuery {
1757                        name,
1758                        materialized,
1759                        if_exists,
1760                    }));
1761                }
1762                if materialized {
1763                    return Err(ParseError::expected(
1764                        vec!["VIEW"],
1765                        self.peek(),
1766                        self.position(),
1767                    ));
1768                }
1769
1770                if self.check(&Token::Index) {
1771                    match self.parse_drop_index_query()? {
1772                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
1773                        other => Err(ParseError::new(
1774                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
1775                            self.position(),
1776                        )),
1777                    }
1778                } else if self.check(&Token::Table) {
1779                    self.expect(Token::Table)?;
1780                    match self.parse_drop_table_body()? {
1781                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
1782                        other => Err(ParseError::new(
1783                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
1784                            self.position(),
1785                        )),
1786                    }
1787                } else if self.check(&Token::Graph) {
1788                    self.advance()?;
1789                    match self.parse_drop_graph_body()? {
1790                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
1791                        other => Err(ParseError::new(
1792                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
1793                            self.position(),
1794                        )),
1795                    }
1796                } else if self.check(&Token::Vector) {
1797                    self.advance()?;
1798                    match self.parse_drop_vector_body()? {
1799                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
1800                        other => Err(ParseError::new(
1801                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
1802                            self.position(),
1803                        )),
1804                    }
1805                } else if self.check(&Token::Document) {
1806                    self.advance()?;
1807                    match self.parse_drop_document_body()? {
1808                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
1809                        other => Err(ParseError::new(
1810                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
1811                            self.position(),
1812                        )),
1813                    }
1814                } else if self.check(&Token::Kv) {
1815                    self.advance()?;
1816                    match self.parse_drop_kv_body()? {
1817                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1818                        other => Err(ParseError::new(
1819                            format!("internal: DROP KV produced unexpected kind {other:?}"),
1820                            self.position(),
1821                        )),
1822                    }
1823                } else if self.consume_ident_ci("CONFIG")? {
1824                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
1825                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1826                        other => Err(ParseError::new(
1827                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
1828                            self.position(),
1829                        )),
1830                    }
1831                } else if self.consume_ident_ci("VAULT")? {
1832                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
1833                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1834                        other => Err(ParseError::new(
1835                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
1836                            self.position(),
1837                        )),
1838                    }
1839                } else if self.check(&Token::Collection) {
1840                    self.advance()?;
1841                    match self.parse_drop_collection_body()? {
1842                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1843                        other => Err(ParseError::new(
1844                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
1845                            self.position(),
1846                        )),
1847                    }
1848                } else if self.check(&Token::Timeseries) {
1849                    self.advance()?;
1850                    match self.parse_drop_timeseries_body()? {
1851                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1852                        other => Err(ParseError::new(
1853                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
1854                            self.position(),
1855                        )),
1856                    }
1857                } else if self.consume_ident_ci("METRICS")? {
1858                    match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
1859                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1860                        other => Err(ParseError::new(
1861                            format!("internal: DROP METRICS produced unexpected kind {other:?}"),
1862                            self.position(),
1863                        )),
1864                    }
1865                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1866                {
1867                    // DROP HYPERTABLE name reuses the same AST as
1868                    // DROP TIMESERIES — runtime clears the registry
1869                    // entry *and* drops the backing collection.
1870                    self.advance()?;
1871                    match self.parse_drop_timeseries_body()? {
1872                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1873                        other => Err(ParseError::new(
1874                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
1875                            self.position(),
1876                        )),
1877                    }
1878                } else if self.check(&Token::Queue) {
1879                    self.advance()?;
1880                    match self.parse_drop_queue_body()? {
1881                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
1882                        other => Err(ParseError::new(
1883                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
1884                            self.position(),
1885                        )),
1886                    }
1887                } else if self.check(&Token::Tree) {
1888                    self.advance()?;
1889                    match self.parse_drop_tree_body()? {
1890                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
1891                        other => Err(ParseError::new(
1892                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
1893                            self.position(),
1894                        )),
1895                    }
1896                } else if matches!(self.peek(), Token::Ident(n) if
1897                    n.eq_ignore_ascii_case("HLL") ||
1898                    n.eq_ignore_ascii_case("SKETCH") ||
1899                    n.eq_ignore_ascii_case("FILTER"))
1900                {
1901                    match self.parse_drop_probabilistic()? {
1902                        QueryExpr::ProbabilisticCommand(command) => {
1903                            Ok(SqlCommand::Probabilistic(command))
1904                        }
1905                        other => Err(ParseError::new(
1906                            format!(
1907                                "internal: DROP probabilistic produced unexpected kind {other:?}"
1908                            ),
1909                            self.position(),
1910                        )),
1911                    }
1912                } else if self.check(&Token::Schema) {
1913                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
1914                    self.advance()?;
1915                    let if_exists = self.match_if_exists()?;
1916                    let name = self.expect_ident()?;
1917                    let cascade = self.consume(&Token::Cascade)?;
1918                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
1919                        name,
1920                        if_exists,
1921                        cascade,
1922                    }))
1923                } else if self.check(&Token::Policy) {
1924                    // Two forms:
1925                    //   * IAM:   DROP POLICY '<id>'
1926                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
1927                    self.advance()?;
1928                    if matches!(self.peek(), Token::String(_)) {
1929                        let expr = self.parse_drop_iam_policy_after_keywords()?;
1930                        return Ok(SqlCommand::IamPolicy(expr));
1931                    }
1932                    let if_exists = self.match_if_exists()?;
1933                    let name = self.expect_ident()?;
1934                    self.expect(Token::On)?;
1935                    let table = self.expect_ident()?;
1936                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
1937                        name,
1938                        table,
1939                        if_exists,
1940                    }))
1941                } else if self.check(&Token::Server) {
1942                    // DROP SERVER [IF EXISTS] name [CASCADE]
1943                    self.advance()?;
1944                    let if_exists = self.match_if_exists()?;
1945                    let name = self.expect_ident()?;
1946                    let cascade = self.consume(&Token::Cascade)?;
1947                    Ok(SqlCommand::DropServer(DropServerQuery {
1948                        name,
1949                        if_exists,
1950                        cascade,
1951                    }))
1952                } else if self.check(&Token::Foreign) {
1953                    // DROP FOREIGN TABLE [IF EXISTS] name
1954                    self.advance()?;
1955                    self.expect(Token::Table)?;
1956                    let if_exists = self.match_if_exists()?;
1957                    let name = self.expect_ident()?;
1958                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
1959                        name,
1960                        if_exists,
1961                    }))
1962                } else if self.check(&Token::Sequence) {
1963                    // DROP SEQUENCE [IF EXISTS] name
1964                    self.advance()?;
1965                    let if_exists = self.match_if_exists()?;
1966                    let name = self.expect_ident()?;
1967                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
1968                        name,
1969                        if_exists,
1970                    }))
1971                } else if let Some(err) =
1972                    ParseError::unsupported_recognized_token(self.peek(), self.position())
1973                {
1974                    Err(err)
1975                } else {
1976                    Err(ParseError::expected(
1977                        vec![
1978                            "TABLE",
1979                            "INDEX",
1980                            "TIMESERIES",
1981                            "QUEUE",
1982                            "TREE",
1983                            "HLL",
1984                            "SKETCH",
1985                            "FILTER",
1986                            "SCHEMA",
1987                            "SEQUENCE",
1988                        ],
1989                        self.peek(),
1990                        pos,
1991                    ))
1992                }
1993            }
1994            Token::Alter => {
1995                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
1996                // committing to a path until we've seen the target.
1997                // We peek the *next* token (without consuming) and
1998                // dispatch accordingly.
1999                let next = self.peek_next()?.clone();
2000                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
2001                    self.advance()?; // consume ALTER
2002                    let stmt = self.parse_alter_user_statement()?;
2003                    Ok(SqlCommand::AlterUser(stmt))
2004                } else if matches!(next, Token::Queue) {
2005                    self.advance()?; // consume ALTER
2006                    self.advance()?; // consume QUEUE
2007                    match self.parse_alter_queue_body()? {
2008                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
2009                        other => Err(ParseError::new(
2010                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
2011                            self.position(),
2012                        )),
2013                    }
2014                } else if matches!(next, Token::Table) {
2015                    match self.parse_alter_table_query()? {
2016                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2017                        other => Err(ParseError::new(
2018                            format!(
2019                                "internal: ALTER TABLE produced unexpected query kind {other:?}"
2020                            ),
2021                            self.position(),
2022                        )),
2023                    }
2024                } else if let Some(err) =
2025                    ParseError::unsupported_recognized_token(&next, self.position())
2026                {
2027                    Err(err)
2028                } else {
2029                    match self.parse_alter_table_query()? {
2030                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2031                        other => Err(ParseError::new(
2032                            format!("internal: ALTER produced unexpected query kind {other:?}"),
2033                            self.position(),
2034                        )),
2035                    }
2036                }
2037            }
2038            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
2039                let stmt = self.parse_grant_statement()?;
2040                Ok(SqlCommand::Grant(stmt))
2041            }
2042            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
2043                let stmt = self.parse_revoke_statement()?;
2044                Ok(SqlCommand::Revoke(stmt))
2045            }
2046            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2047                self.advance()?;
2048                if self.consume_ident_ci("BACKFILL")? {
2049                    return Err(ParseError::new(
2050                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
2051                            .to_string(),
2052                        self.position(),
2053                    ));
2054                }
2055                if !self.consume_ident_ci("STATUS")? {
2056                    return Err(ParseError::expected(
2057                        vec!["STATUS"],
2058                        self.peek(),
2059                        self.position(),
2060                    ));
2061                }
2062
2063                let mut query = TableQuery::new("red.subscriptions");
2064                let collection = match self.peek().clone() {
2065                    Token::Ident(name) => {
2066                        self.advance()?;
2067                        Some(name)
2068                    }
2069                    Token::String(name) => {
2070                        self.advance()?;
2071                        Some(name)
2072                    }
2073                    _ => None,
2074                };
2075                self.parse_table_clauses(&mut query)?;
2076                if let Some(collection) = collection {
2077                    let filter = Filter::compare(
2078                        FieldRef::column("red.subscriptions", "collection"),
2079                        CompareOp::Eq,
2080                        Value::text(collection),
2081                    );
2082                    let expr = filter_to_expr(&filter);
2083                    query.where_expr = Some(match query.where_expr.take() {
2084                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
2085                        None => expr,
2086                    });
2087                    query.filter = Some(match query.filter.take() {
2088                        Some(existing) => existing.and(filter),
2089                        None => filter,
2090                    });
2091                }
2092                Ok(SqlCommand::Select(query))
2093            }
2094            Token::Attach => {
2095                let expr = self.parse_attach_policy()?;
2096                Ok(SqlCommand::IamPolicy(expr))
2097            }
2098            Token::Detach => {
2099                let expr = self.parse_detach_policy()?;
2100                Ok(SqlCommand::IamPolicy(expr))
2101            }
2102            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
2103                let expr = self.parse_simulate_policy()?;
2104                Ok(SqlCommand::IamPolicy(expr))
2105            }
2106            Token::Set => {
2107                self.advance()?;
2108                if self.consume_ident_ci("CONFIG")? {
2109                    let full_key = self.parse_dotted_admin_path(true)?;
2110                    self.expect(Token::Eq)?;
2111                    let value = self.parse_literal_value()?;
2112                    Ok(SqlCommand::SetConfig {
2113                        key: full_key,
2114                        value,
2115                    })
2116                } else if self.consume_ident_ci("SECRET")? {
2117                    let key = self.parse_dotted_admin_path(true)?;
2118                    self.expect(Token::Eq)?;
2119                    let value = self.parse_literal_value()?;
2120                    Ok(SqlCommand::SetSecret { key, value })
2121                } else if self.consume_ident_ci("TENANT")? {
2122                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
2123                    // SET TENANT NULL  |  SET TENANT = NULL
2124                    let _ = self.consume(&Token::Eq)?;
2125                    if self.consume_ident_ci("NULL")? {
2126                        Ok(SqlCommand::SetTenant(None))
2127                    } else {
2128                        let value = self.parse_literal_value()?;
2129                        match value {
2130                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2131                            Value::Null => Ok(SqlCommand::SetTenant(None)),
2132                            other => Err(ParseError::new(
2133                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2134                                self.position(),
2135                            )),
2136                        }
2137                    }
2138                } else {
2139                    Err(ParseError::expected(
2140                        vec!["CONFIG", "SECRET", "TENANT"],
2141                        self.peek(),
2142                        self.position(),
2143                    ))
2144                }
2145            }
2146            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2147                self.advance()?;
2148                match self.parse_apply_migration()? {
2149                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2150                    other => Err(ParseError::new(
2151                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2152                        self.position(),
2153                    )),
2154                }
2155            }
2156            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2157                // RESET TENANT — session-local clear
2158                self.advance()?;
2159                if self.consume_ident_ci("TENANT")? {
2160                    Ok(SqlCommand::SetTenant(None))
2161                } else {
2162                    Err(ParseError::expected(
2163                        vec!["TENANT"],
2164                        self.peek(),
2165                        self.position(),
2166                    ))
2167                }
2168            }
2169            Token::Ident(name)
2170                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
2171            {
2172                self.advance()?;
2173                let collection = self.parse_dotted_admin_path(false)?;
2174                let mut query = TableQuery::new("red.describe");
2175                query.filter = Some(Filter::compare(
2176                    FieldRef::column("", "collection"),
2177                    CompareOp::Eq,
2178                    Value::text(collection),
2179                ));
2180                Ok(SqlCommand::Select(query))
2181            }
2182            Token::Desc => {
2183                self.advance()?;
2184                let collection = self.parse_dotted_admin_path(false)?;
2185                let mut query = TableQuery::new("red.describe");
2186                query.filter = Some(Filter::compare(
2187                    FieldRef::column("", "collection"),
2188                    CompareOp::Eq,
2189                    Value::text(collection),
2190                ));
2191                Ok(SqlCommand::Select(query))
2192            }
2193            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2194                self.advance()?;
2195                if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
2196                    if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
2197                        return Err(ParseError::expected(
2198                            vec!["TABLE"],
2199                            self.peek(),
2200                            self.position(),
2201                        ));
2202                    }
2203                    let collection = self.parse_dotted_admin_path(false)?;
2204                    let mut query = TableQuery::new("red.show_create");
2205                    query.filter = Some(Filter::compare(
2206                        FieldRef::column("", "collection"),
2207                        CompareOp::Eq,
2208                        Value::text(collection),
2209                    ));
2210                    Ok(SqlCommand::Select(query))
2211                } else if self.consume_ident_ci("CONFIG")? {
2212                    // Accept dotted prefixes the same way SET CONFIG does
2213                    // (`SHOW CONFIG durability.mode`), and empty prefix
2214                    // (`SHOW CONFIG`) for a catalog-wide listing.
2215                    let prefix = if !self.check(&Token::Eof) {
2216                        let first = self.expect_ident()?;
2217                        let mut full = first;
2218                        while self.consume(&Token::Dot)? {
2219                            let next = self.expect_ident_or_keyword()?;
2220                            full = format!("{full}.{next}");
2221                        }
2222                        // Match SET CONFIG: lowercase so keyword segments
2223                        // come out consistent with the stored keys.
2224                        Some(full.to_ascii_lowercase())
2225                    } else {
2226                        None
2227                    };
2228                    Ok(SqlCommand::ShowConfig { prefix })
2229                } else if self.consume_ident_ci("COLLECTIONS")? {
2230                    let mut query = TableQuery::new("red.collections");
2231                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2232                        if !self.consume_ident_ci("INTERNAL")? {
2233                            return Err(ParseError::expected(
2234                                vec!["INTERNAL"],
2235                                self.peek(),
2236                                self.position(),
2237                            ));
2238                        }
2239                        true
2240                    } else {
2241                        false
2242                    };
2243                    self.parse_table_clauses(&mut query)?;
2244                    if !include_internal {
2245                        let user_filter = query.filter.take();
2246                        let hide_internal = crate::storage::query::ast::Filter::Compare {
2247                            field: FieldRef::column("", "internal"),
2248                            op: CompareOp::Eq,
2249                            value: Value::Boolean(false),
2250                        };
2251                        query.filter = Some(match user_filter {
2252                            Some(filter) => filter.and(hide_internal),
2253                            None => hide_internal,
2254                        });
2255                    }
2256                    Ok(SqlCommand::Select(query))
2257                } else if self.consume_ident_ci("TABLES")? {
2258                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2259                        self, "table",
2260                    )?))
2261                } else if self.consume_ident_ci("QUEUES")? {
2262                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2263                        self, "queue",
2264                    )?))
2265                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2266                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2267                        self, "vector",
2268                    )?))
2269                } else if self.consume_ident_ci("DOCUMENTS")? {
2270                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2271                        self, "document",
2272                    )?))
2273                } else if self.consume(&Token::Timeseries)?
2274                    || self.consume_ident_ci("TIMESERIES")?
2275                {
2276                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2277                        self,
2278                        "timeseries",
2279                    )?))
2280                } else if self.consume_ident_ci("GRAPHS")? {
2281                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2282                        self, "graph",
2283                    )?))
2284                } else if self.consume_ident_ci("CONFIGS")? {
2285                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2286                        self, "config",
2287                    )?))
2288                } else if self.consume_ident_ci("VAULTS")? {
2289                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2290                        self, "vault",
2291                    )?))
2292                } else if self.consume(&Token::Kv)?
2293                    || self.consume_ident_ci("KV")?
2294                    || self.consume_ident_ci("KVS")?
2295                {
2296                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2297                        self, "kv",
2298                    )?))
2299                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2300                    let collection = self.parse_dotted_admin_path(false)?;
2301                    let mut query = TableQuery::new("red.columns");
2302                    query.filter = Some(Filter::compare(
2303                        FieldRef::column("", "collection"),
2304                        CompareOp::Eq,
2305                        Value::text(collection),
2306                    ));
2307                    Ok(SqlCommand::Select(query))
2308                } else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
2309                    let mut query = TableQuery::new("red.show_indexes");
2310                    if self.consume(&Token::On)? {
2311                        let collection = self.expect_ident_or_keyword()?;
2312                        let filter = Filter::Compare {
2313                            field: FieldRef::column("", "table"),
2314                            op: CompareOp::Eq,
2315                            value: Value::text(collection),
2316                        };
2317                        query.where_expr = Some(filter_to_expr(&filter));
2318                        query.filter = Some(filter);
2319                    }
2320                    self.parse_table_clauses(&mut query)?;
2321                    Ok(SqlCommand::Select(query))
2322                } else if self.consume_ident_ci("POLICIES")? {
2323                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2324                        let principal = self.parse_iam_principal_kind()?;
2325                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2326                            filter: Some(principal),
2327                        }));
2328                    }
2329                    let mut query = TableQuery::new("red.policies");
2330                    let collection_filter =
2331                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2332                            let collection = self.parse_dotted_admin_path(false)?;
2333                            Some(Filter::Compare {
2334                                field: FieldRef::TableColumn {
2335                                    table: String::new(),
2336                                    column: "collection".to_string(),
2337                                },
2338                                op: CompareOp::Eq,
2339                                value: Value::text(collection),
2340                            })
2341                        } else {
2342                            None
2343                        };
2344                    self.parse_table_clauses(&mut query)?;
2345                    if let Some(collection_filter) = collection_filter {
2346                        let combined = match query.filter.take() {
2347                            Some(existing) => {
2348                                Filter::And(Box::new(collection_filter), Box::new(existing))
2349                            }
2350                            None => collection_filter,
2351                        };
2352                        query.where_expr = Some(filter_to_expr(&combined));
2353                        query.filter = Some(combined);
2354                    }
2355                    Ok(SqlCommand::Select(query))
2356                } else if self.consume_ident_ci("STATS")? {
2357                    let mut query = TableQuery::new("red.stats");
2358                    let collection = match self.peek().clone() {
2359                        Token::Ident(name) => {
2360                            self.advance()?;
2361                            Some(name)
2362                        }
2363                        Token::String(name) => {
2364                            self.advance()?;
2365                            Some(name)
2366                        }
2367                        _ => None,
2368                    };
2369                    self.parse_table_clauses(&mut query)?;
2370                    if let Some(collection) = collection {
2371                        let filter = Filter::compare(
2372                            FieldRef::column("red.stats", "collection"),
2373                            CompareOp::Eq,
2374                            Value::text(collection),
2375                        );
2376                        let expr = filter_to_expr(&filter);
2377                        query.where_expr = Some(match query.where_expr.take() {
2378                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
2379                            None => expr,
2380                        });
2381                        query.filter = Some(match query.filter.take() {
2382                            Some(existing) => existing.and(filter),
2383                            None => filter,
2384                        });
2385                    }
2386                    Ok(SqlCommand::Select(query))
2387                } else if self.consume_ident_ci("SAMPLE")? {
2388                    let mut query = TableQuery::new(&self.expect_ident()?);
2389                    query.limit = if self.consume(&Token::Limit)? {
2390                        Some(self.parse_integer()? as u64)
2391                    } else {
2392                        Some(10)
2393                    };
2394                    Ok(SqlCommand::Select(query))
2395                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2396                    let prefix = if !self.check(&Token::Eof) {
2397                        Some(self.parse_dotted_admin_path(true)?)
2398                    } else {
2399                        None
2400                    };
2401                    Ok(SqlCommand::ShowSecrets { prefix })
2402                } else if self.consume_ident_ci("TENANT")? {
2403                    Ok(SqlCommand::ShowTenant)
2404                } else if let Some(expr) = self.parse_show_iam_after_show()? {
2405                    Ok(SqlCommand::IamPolicy(expr))
2406                } else {
2407                    Err(ParseError::expected(
2408                        vec![
2409                            "CONFIG",
2410                            "SECRET",
2411                            "SECRETS",
2412                            "COLLECTIONS",
2413                            "TABLES",
2414                            "QUEUES",
2415                            "VECTORS",
2416                            "DOCUMENTS",
2417                            "TIMESERIES",
2418                            "GRAPHS",
2419                            "KV",
2420                            "SCHEMA",
2421                            "INDICES",
2422                            "INDEXES",
2423                            "SAMPLE",
2424                            "POLICIES",
2425                            "STATS",
2426                            "TENANT",
2427                            "EFFECTIVE",
2428                        ],
2429                        self.peek(),
2430                        self.position(),
2431                    ))
2432                }
2433            }
2434            // Transaction control statements (Phase 1.1 PG parity).
2435            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
2436            // START TRANSACTION [ISOLATION LEVEL <mode>]
2437            //
2438            // We only implement SNAPSHOT ISOLATION (our default). We
2439            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2440            // READ / SNAPSHOT as PG-compatible no-ops, but reject
2441            // SERIALIZABLE outright — the previous behaviour of
2442            // silently degrading to snapshot made the parser
2443            // dishonest. Real SSI (Serializable Snapshot Isolation)
2444            // is tracked as a future milestone.
2445            Token::Begin | Token::Start => {
2446                self.advance()?;
2447                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2448                // Optional ISOLATION LEVEL clause.
2449                if self.consume_ident_ci("ISOLATION")? {
2450                    self.expect(Token::Level)?;
2451                    // The level identifier can span multiple words
2452                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2453                    // READ). Collect them case-insensitively.
2454                    let mut parts: Vec<String> = Vec::new();
2455                    if self.consume_ident_ci("READ")? {
2456                        parts.push("READ".to_string());
2457                        if self.consume_ident_ci("UNCOMMITTED")? {
2458                            parts.push("UNCOMMITTED".to_string());
2459                        } else if self.consume_ident_ci("COMMITTED")? {
2460                            parts.push("COMMITTED".to_string());
2461                        } else {
2462                            return Err(ParseError::expected(
2463                                vec!["UNCOMMITTED", "COMMITTED"],
2464                                self.peek(),
2465                                self.position(),
2466                            ));
2467                        }
2468                    } else if self.consume_ident_ci("REPEATABLE")? {
2469                        parts.push("REPEATABLE".to_string());
2470                        if !self.consume_ident_ci("READ")? {
2471                            return Err(ParseError::expected(
2472                                vec!["READ"],
2473                                self.peek(),
2474                                self.position(),
2475                            ));
2476                        }
2477                        parts.push("READ".to_string());
2478                    } else if self.consume_ident_ci("SNAPSHOT")? {
2479                        parts.push("SNAPSHOT".to_string());
2480                    } else if self.consume_ident_ci("SERIALIZABLE")? {
2481                        return Err(ParseError::new(
2482                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2483                             currently provides SNAPSHOT ISOLATION (which PG calls \
2484                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2485                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
2486                                .to_string(),
2487                            self.position(),
2488                        ));
2489                    } else {
2490                        return Err(ParseError::expected(
2491                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2492                            self.peek(),
2493                            self.position(),
2494                        ));
2495                    }
2496                    // All accepted modes map to our snapshot engine today.
2497                    let _ = parts;
2498                }
2499                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2500            }
2501            // COMMIT [WORK | TRANSACTION]
2502            Token::Commit => {
2503                self.advance()?;
2504                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2505                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2506            }
2507            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
2508            // ROLLBACK MIGRATION name
2509            Token::Rollback => {
2510                self.advance()?;
2511                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2512                    match self.parse_rollback_migration_after_keyword()? {
2513                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2514                        other => Err(ParseError::new(
2515                            format!(
2516                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2517                            ),
2518                            self.position(),
2519                        )),
2520                    }
2521                } else {
2522                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2523                    if self.consume(&Token::To)? {
2524                        let _ = self.consume(&Token::Savepoint)?;
2525                        let name = self.expect_ident()?;
2526                        Ok(SqlCommand::TransactionControl(
2527                            TxnControl::RollbackToSavepoint(name),
2528                        ))
2529                    } else {
2530                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2531                    }
2532                }
2533            }
2534            // SAVEPOINT name
2535            Token::Savepoint => {
2536                self.advance()?;
2537                let name = self.expect_ident()?;
2538                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2539            }
2540            // RELEASE [SAVEPOINT] name
2541            Token::Release => {
2542                self.advance()?;
2543                let _ = self.consume(&Token::Savepoint)?;
2544                let name = self.expect_ident()?;
2545                Ok(SqlCommand::TransactionControl(
2546                    TxnControl::ReleaseSavepoint(name),
2547                ))
2548            }
2549            // VACUUM [FULL] [table]
2550            Token::Vacuum => {
2551                self.advance()?;
2552                let full = self.consume(&Token::Full)?;
2553                let target = if self.check(&Token::Eof) {
2554                    None
2555                } else {
2556                    Some(self.expect_ident()?)
2557                };
2558                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2559                    target,
2560                    full,
2561                }))
2562            }
2563            // REFRESH MATERIALIZED VIEW name
2564            Token::Refresh => {
2565                self.advance()?;
2566                self.expect(Token::Materialized)?;
2567                self.expect(Token::View)?;
2568                let name = self.expect_ident()?;
2569                Ok(SqlCommand::RefreshMaterializedView(
2570                    RefreshMaterializedViewQuery { name },
2571                ))
2572            }
2573            // ANALYZE [table]
2574            Token::Analyze => {
2575                self.advance()?;
2576                let target = if self.check(&Token::Eof) {
2577                    None
2578                } else {
2579                    Some(self.expect_ident()?)
2580                };
2581                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2582                    target,
2583                }))
2584            }
2585            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
2586            //
2587            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
2588            // short-form `DELIMITER ',' HEADER`. The only supported format
2589            // today is CSV.
2590            Token::Copy => {
2591                self.advance()?;
2592                let table = self.expect_ident()?;
2593                self.expect(Token::From)?;
2594                let path = self.parse_string()?;
2595
2596                let mut delimiter: Option<char> = None;
2597                let mut has_header = false;
2598                let format = CopyFormat::Csv;
2599
2600                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
2601                // `WITH` is a reserved keyword token — accept both the keyword
2602                // form and the ident form that non-CTE callers sometimes emit.
2603                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2604                    self.expect(Token::LParen)?;
2605                    loop {
2606                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2607                            let _ = self.consume(&Token::Eq)?;
2608                            // Only CSV for now — accept the ident and move on.
2609                            let _ = self.expect_ident()?;
2610                        } else if self.consume(&Token::Header)? {
2611                            let _ = self.consume(&Token::Eq)?;
2612                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
2613                            // or an ident spelling of true/false.
2614                            has_header = match self.peek().clone() {
2615                                Token::True => {
2616                                    self.advance()?;
2617                                    true
2618                                }
2619                                Token::False => {
2620                                    self.advance()?;
2621                                    false
2622                                }
2623                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2624                                    self.advance()?;
2625                                    true
2626                                }
2627                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2628                                    self.advance()?;
2629                                    false
2630                                }
2631                                _ => true,
2632                            };
2633                        } else if self.consume(&Token::Delimiter)? {
2634                            let _ = self.consume(&Token::Eq)?;
2635                            let s = self.parse_string()?;
2636                            delimiter = s.chars().next();
2637                        } else {
2638                            break;
2639                        }
2640                        if !self.consume(&Token::Comma)? {
2641                            break;
2642                        }
2643                    }
2644                    self.expect(Token::RParen)?;
2645                }
2646
2647                // Short form clauses outside WITH (in either order).
2648                loop {
2649                    if self.consume(&Token::Delimiter)? {
2650                        let s = self.parse_string()?;
2651                        delimiter = s.chars().next();
2652                    } else if self.consume(&Token::Header)? {
2653                        has_header = true;
2654                    } else {
2655                        break;
2656                    }
2657                }
2658
2659                Ok(SqlCommand::CopyFrom(CopyFromQuery {
2660                    table,
2661                    path,
2662                    format,
2663                    delimiter,
2664                    has_header,
2665                }))
2666            }
2667            other => Err(ParseError::expected(
2668                vec![
2669                    "SELECT",
2670                    "FROM",
2671                    "INSERT",
2672                    "UPDATE",
2673                    "DELETE",
2674                    "EXPLAIN",
2675                    "CREATE",
2676                    "DROP",
2677                    "ALTER",
2678                    "SET",
2679                    "SHOW",
2680                    "BEGIN",
2681                    "COMMIT",
2682                    "ROLLBACK",
2683                    "SAVEPOINT",
2684                    "RELEASE",
2685                    "START",
2686                    "VACUUM",
2687                    "ANALYZE",
2688                    "COPY",
2689                    "REFRESH",
2690                    "DESCRIBE",
2691                    "DESC",
2692                ],
2693                other,
2694                self.position(),
2695            )),
2696        }
2697    }
2698}