Skip to main content

reddb_server/storage/query/
sql.rs

1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3    AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery, AskQuery, BinOp,
4    CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
5    CreateForeignTableQuery, CreateIndexQuery, CreateMigrationQuery, CreatePolicyQuery,
6    CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery, CreateTableQuery,
7    CreateTimeSeriesQuery, CreateTreeQuery, CreateVectorQuery, CreateViewQuery, DeleteQuery,
8    DropCollectionQuery, DropDocumentQuery, DropForeignTableQuery, DropGraphQuery, DropIndexQuery,
9    DropKvQuery, DropPolicyQuery, DropQueueQuery, DropSchemaQuery, DropSequenceQuery,
10    DropServerQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery,
11    DropViewQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef,
12    Filter, ForeignColumnDef, GrantStmt, GraphCommand, GraphQuery, HybridQuery, InsertQuery,
13    JoinQuery, KvCommand, MaintenanceCommand, PathQuery, PolicyAction, ProbabilisticCommand,
14    QueryExpr, QueueCommand, QueueSelectQuery, RefreshMaterializedViewQuery, RevokeStmt,
15    RollbackMigrationQuery, SearchCommand, Span, TableQuery, TreeCommand, TruncateQuery,
16    TxnControl, UpdateQuery, VectorQuery,
17};
18use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
19use crate::storage::query::sql_lowering::filter_to_expr;
20use crate::storage::query::Token;
21use crate::storage::schema::Value;
22
23/// Canonical SQL frontend command surface.
24///
25/// This is the single entrypoint for SQL/RQL-style commands before they are
26/// lowered into the broader multi-backend `QueryExpr` space.
27#[derive(Debug, Clone)]
28pub enum SqlStatement {
29    Query(SqlQuery),
30    Mutation(SqlMutation),
31    Schema(SqlSchemaCommand),
32    Admin(SqlAdminCommand),
33}
34
35#[derive(Debug, Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum FrontendStatement {
38    Sql(SqlStatement),
39    Graph(GraphQuery),
40    GraphCommand(GraphCommand),
41    Path(PathQuery),
42    Vector(VectorQuery),
43    Hybrid(HybridQuery),
44    Search(SearchCommand),
45    Ask(AskQuery),
46    QueueSelect(QueueSelectQuery),
47    QueueCommand(QueueCommand),
48    EventsBackfill(EventsBackfillQuery),
49    EventsBackfillStatus { collection: String },
50    TreeCommand(TreeCommand),
51    ProbabilisticCommand(ProbabilisticCommand),
52    KvCommand(KvCommand),
53    ConfigCommand(ConfigCommand),
54}
55
56#[derive(Debug, Clone)]
57pub enum SqlCommand {
58    Select(TableQuery),
59    Join(JoinQuery),
60    Insert(InsertQuery),
61    Update(UpdateQuery),
62    Delete(DeleteQuery),
63    ExplainAlter(ExplainAlterQuery),
64    CreateTable(CreateTableQuery),
65    CreateCollection(CreateCollectionQuery),
66    CreateVector(CreateVectorQuery),
67    DropTable(DropTableQuery),
68    DropGraph(DropGraphQuery),
69    DropVector(DropVectorQuery),
70    DropDocument(DropDocumentQuery),
71    DropKv(DropKvQuery),
72    DropCollection(DropCollectionQuery),
73    Truncate(TruncateQuery),
74    AlterTable(AlterTableQuery),
75    CreateIndex(CreateIndexQuery),
76    DropIndex(DropIndexQuery),
77    CreateTimeSeries(CreateTimeSeriesQuery),
78    DropTimeSeries(DropTimeSeriesQuery),
79    CreateQueue(CreateQueueQuery),
80    AlterQueue(AlterQueueQuery),
81    DropQueue(DropQueueQuery),
82    CreateTree(CreateTreeQuery),
83    DropTree(DropTreeQuery),
84    Probabilistic(ProbabilisticCommand),
85    SetConfig {
86        key: String,
87        value: Value,
88    },
89    ShowConfig {
90        prefix: Option<String>,
91    },
92    SetSecret {
93        key: String,
94        value: Value,
95    },
96    DeleteSecret {
97        key: String,
98    },
99    ShowSecrets {
100        prefix: Option<String>,
101    },
102    SetTenant(Option<String>),
103    ShowTenant,
104    TransactionControl(TxnControl),
105    Maintenance(MaintenanceCommand),
106    CreateSchema(CreateSchemaQuery),
107    DropSchema(DropSchemaQuery),
108    CreateSequence(CreateSequenceQuery),
109    DropSequence(DropSequenceQuery),
110    CopyFrom(CopyFromQuery),
111    CreateView(CreateViewQuery),
112    DropView(DropViewQuery),
113    RefreshMaterializedView(RefreshMaterializedViewQuery),
114    CreatePolicy(CreatePolicyQuery),
115    DropPolicy(DropPolicyQuery),
116    CreateServer(CreateServerQuery),
117    DropServer(DropServerQuery),
118    CreateForeignTable(CreateForeignTableQuery),
119    DropForeignTable(DropForeignTableQuery),
120    /// `GRANT … ON … TO …`
121    Grant(GrantStmt),
122    /// `REVOKE … ON … FROM …`
123    Revoke(RevokeStmt),
124    /// `ALTER USER name <attrs>`
125    AlterUser(AlterUserStmt),
126    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
127    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
128    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
129    /// can route the multitude of shapes through a single arm.
130    IamPolicy(QueryExpr),
131    CreateMigration(CreateMigrationQuery),
132    ApplyMigration(ApplyMigrationQuery),
133    RollbackMigration(RollbackMigrationQuery),
134    ExplainMigration(ExplainMigrationQuery),
135}
136
137fn collection_model_filter(model: &str) -> Filter {
138    Filter::Compare {
139        field: FieldRef::column("", "model"),
140        op: CompareOp::Eq,
141        value: Value::Text(model.to_string().into()),
142    }
143}
144
145fn add_table_filter(query: &mut TableQuery, filter: Filter) {
146    let combined = match query.filter.take() {
147        Some(existing) => existing.and(filter),
148        None => filter,
149    };
150    query.where_expr = Some(filter_to_expr(&combined));
151    query.filter = Some(combined);
152}
153
154fn parse_show_collections_by_model(
155    parser: &mut Parser<'_>,
156    model: &str,
157) -> Result<TableQuery, ParseError> {
158    let mut query = TableQuery::new("red.collections");
159    parser.parse_table_clauses(&mut query)?;
160    add_table_filter(&mut query, collection_model_filter(model));
161    Ok(query)
162}
163
164#[derive(Debug, Clone)]
165#[allow(clippy::large_enum_variant)]
166pub enum SqlQuery {
167    Select(TableQuery),
168    Join(JoinQuery),
169}
170
171#[derive(Debug, Clone)]
172pub enum SqlMutation {
173    Insert(InsertQuery),
174    Update(UpdateQuery),
175    Delete(DeleteQuery),
176}
177
178#[derive(Debug, Clone)]
179pub enum SqlSchemaCommand {
180    ExplainAlter(ExplainAlterQuery),
181    CreateTable(CreateTableQuery),
182    CreateCollection(CreateCollectionQuery),
183    CreateVector(CreateVectorQuery),
184    DropTable(DropTableQuery),
185    DropGraph(DropGraphQuery),
186    DropVector(DropVectorQuery),
187    DropDocument(DropDocumentQuery),
188    DropKv(DropKvQuery),
189    DropCollection(DropCollectionQuery),
190    Truncate(TruncateQuery),
191    AlterTable(AlterTableQuery),
192    CreateIndex(CreateIndexQuery),
193    DropIndex(DropIndexQuery),
194    CreateTimeSeries(CreateTimeSeriesQuery),
195    DropTimeSeries(DropTimeSeriesQuery),
196    CreateQueue(CreateQueueQuery),
197    AlterQueue(AlterQueueQuery),
198    DropQueue(DropQueueQuery),
199    CreateTree(CreateTreeQuery),
200    DropTree(DropTreeQuery),
201    Probabilistic(ProbabilisticCommand),
202    CreateSchema(CreateSchemaQuery),
203    DropSchema(DropSchemaQuery),
204    CreateSequence(CreateSequenceQuery),
205    DropSequence(DropSequenceQuery),
206    CopyFrom(CopyFromQuery),
207    CreateView(CreateViewQuery),
208    DropView(DropViewQuery),
209    RefreshMaterializedView(RefreshMaterializedViewQuery),
210    CreatePolicy(CreatePolicyQuery),
211    DropPolicy(DropPolicyQuery),
212    CreateServer(CreateServerQuery),
213    DropServer(DropServerQuery),
214    CreateForeignTable(CreateForeignTableQuery),
215    DropForeignTable(DropForeignTableQuery),
216    CreateMigration(CreateMigrationQuery),
217    ApplyMigration(ApplyMigrationQuery),
218    RollbackMigration(RollbackMigrationQuery),
219    ExplainMigration(ExplainMigrationQuery),
220}
221
222#[derive(Debug, Clone)]
223#[allow(clippy::large_enum_variant)]
224pub enum SqlAdminCommand {
225    SetConfig { key: String, value: Value },
226    ShowConfig { prefix: Option<String> },
227    SetSecret { key: String, value: Value },
228    DeleteSecret { key: String },
229    ShowSecrets { prefix: Option<String> },
230    SetTenant(Option<String>),
231    ShowTenant,
232    TransactionControl(TxnControl),
233    Maintenance(MaintenanceCommand),
234    Grant(GrantStmt),
235    Revoke(RevokeStmt),
236    AlterUser(AlterUserStmt),
237    IamPolicy(QueryExpr),
238}
239
240impl SqlStatement {
241    pub fn into_command(self) -> SqlCommand {
242        match self {
243            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
244            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
245            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
246            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
247            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
248            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
249                SqlCommand::ExplainAlter(query)
250            }
251            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
252                SqlCommand::CreateTable(query)
253            }
254            SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
255                SqlCommand::CreateCollection(query)
256            }
257            SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
258                SqlCommand::CreateVector(query)
259            }
260            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
261                SqlCommand::DropTable(query)
262            }
263            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
264                SqlCommand::DropGraph(query)
265            }
266            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
267                SqlCommand::DropVector(query)
268            }
269            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
270                SqlCommand::DropDocument(query)
271            }
272            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
273            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
274                SqlCommand::DropCollection(query)
275            }
276            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
277            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
278                SqlCommand::AlterTable(query)
279            }
280            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
281                SqlCommand::CreateIndex(query)
282            }
283            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
284                SqlCommand::DropIndex(query)
285            }
286            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
287                SqlCommand::CreateTimeSeries(query)
288            }
289            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
290                SqlCommand::DropTimeSeries(query)
291            }
292            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
293                SqlCommand::CreateQueue(query)
294            }
295            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
296                SqlCommand::AlterQueue(query)
297            }
298            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
299                SqlCommand::DropQueue(query)
300            }
301            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
302                SqlCommand::CreateTree(query)
303            }
304            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
305            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
306                SqlCommand::Probabilistic(command)
307            }
308            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
309                SqlCommand::SetConfig { key, value }
310            }
311            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
312                SqlCommand::ShowConfig { prefix }
313            }
314            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
315                SqlCommand::SetSecret { key, value }
316            }
317            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
318                SqlCommand::DeleteSecret { key }
319            }
320            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
321                SqlCommand::ShowSecrets { prefix }
322            }
323            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
324            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
325            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
326                SqlCommand::TransactionControl(ctl)
327            }
328            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
329            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
330            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
331            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
332                SqlCommand::CreateSequence(q)
333            }
334            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
335            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
336            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
337            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
338            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
339                SqlCommand::RefreshMaterializedView(q)
340            }
341            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
342            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
343            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
344            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
345            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
346                SqlCommand::CreateForeignTable(q)
347            }
348            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
349                SqlCommand::DropForeignTable(q)
350            }
351            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
352            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
353            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
354            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
355            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
356                SqlCommand::CreateMigration(q)
357            }
358            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
359                SqlCommand::ApplyMigration(q)
360            }
361            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
362                SqlCommand::RollbackMigration(q)
363            }
364            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
365                SqlCommand::ExplainMigration(q)
366            }
367        }
368    }
369
370    pub fn into_query_expr(self) -> QueryExpr {
371        self.into_command().into_query_expr()
372    }
373}
374
375impl FrontendStatement {
376    pub fn into_query_expr(self) -> QueryExpr {
377        match self {
378            FrontendStatement::Sql(statement) => statement.into_query_expr(),
379            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
380            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
381            FrontendStatement::Path(query) => QueryExpr::Path(query),
382            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
383            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
384            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
385            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
386            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
387            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
388            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
389            FrontendStatement::EventsBackfillStatus { collection } => {
390                QueryExpr::EventsBackfillStatus { collection }
391            }
392            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
393            FrontendStatement::ProbabilisticCommand(command) => {
394                QueryExpr::ProbabilisticCommand(command)
395            }
396            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
397            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
398        }
399    }
400}
401
402pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
403    let mut parser = Parser::new(input)?;
404    let statement = parser.parse_frontend_statement()?;
405    if !parser.check(&Token::Eof) {
406        return Err(ParseError::new(
407            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
408            // Display arms emit raw user bytes. Render via `{:?}` so
409            // embedded CR/LF/NUL/quotes are escaped before the message
410            // reaches downstream JSON / audit / log / gRPC sinks.
411            format!("Unexpected token after query: {:?}", parser.current.token),
412            parser.position(),
413        ));
414    }
415    Ok(statement)
416}
417
418impl SqlCommand {
419    pub fn into_query_expr(self) -> QueryExpr {
420        match self {
421            SqlCommand::Select(query) => QueryExpr::Table(query),
422            SqlCommand::Join(query) => QueryExpr::Join(query),
423            SqlCommand::Insert(query) => QueryExpr::Insert(query),
424            SqlCommand::Update(query) => QueryExpr::Update(query),
425            SqlCommand::Delete(query) => QueryExpr::Delete(query),
426            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
427            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
428            SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
429            SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
430            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
431            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
432            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
433            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
434            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
435            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
436            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
437            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
438            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
439            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
440            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
441            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
442            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
443            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
444            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
445            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
446            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
447            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
448            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
449            SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
450            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
451            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
452            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
453            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
454            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
455            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
456            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
457            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
458            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
459            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
460            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
461            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
462            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
463            SqlCommand::DropView(q) => QueryExpr::DropView(q),
464            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
465            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
466            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
467            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
468            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
469            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
470            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
471            SqlCommand::Grant(s) => QueryExpr::Grant(s),
472            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
473            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
474            SqlCommand::IamPolicy(e) => e,
475            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
476            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
477            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
478            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
479        }
480    }
481
482    pub fn into_statement(self) -> SqlStatement {
483        match self {
484            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
485            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
486            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
487            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
488            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
489            SqlCommand::ExplainAlter(query) => {
490                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
491            }
492            SqlCommand::CreateTable(query) => {
493                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
494            }
495            SqlCommand::CreateCollection(query) => {
496                SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
497            }
498            SqlCommand::CreateVector(query) => {
499                SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
500            }
501            SqlCommand::DropTable(query) => {
502                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
503            }
504            SqlCommand::DropGraph(query) => {
505                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
506            }
507            SqlCommand::DropVector(query) => {
508                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
509            }
510            SqlCommand::DropDocument(query) => {
511                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
512            }
513            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
514            SqlCommand::DropCollection(query) => {
515                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
516            }
517            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
518            SqlCommand::AlterTable(query) => {
519                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
520            }
521            SqlCommand::CreateIndex(query) => {
522                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
523            }
524            SqlCommand::DropIndex(query) => {
525                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
526            }
527            SqlCommand::CreateTimeSeries(query) => {
528                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
529            }
530            SqlCommand::DropTimeSeries(query) => {
531                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
532            }
533            SqlCommand::CreateQueue(query) => {
534                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
535            }
536            SqlCommand::AlterQueue(query) => {
537                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
538            }
539            SqlCommand::DropQueue(query) => {
540                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
541            }
542            SqlCommand::CreateTree(query) => {
543                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
544            }
545            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
546            SqlCommand::Probabilistic(command) => {
547                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
548            }
549            SqlCommand::SetConfig { key, value } => {
550                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
551            }
552            SqlCommand::ShowConfig { prefix } => {
553                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
554            }
555            SqlCommand::SetSecret { key, value } => {
556                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
557            }
558            SqlCommand::DeleteSecret { key } => {
559                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
560            }
561            SqlCommand::ShowSecrets { prefix } => {
562                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
563            }
564            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
565            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
566            SqlCommand::TransactionControl(ctl) => {
567                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
568            }
569            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
570            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
571            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
572            SqlCommand::CreateSequence(q) => {
573                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
574            }
575            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
576            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
577            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
578            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
579            SqlCommand::RefreshMaterializedView(q) => {
580                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
581            }
582            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
583            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
584            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
585            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
586            SqlCommand::CreateForeignTable(q) => {
587                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
588            }
589            SqlCommand::DropForeignTable(q) => {
590                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
591            }
592            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
593            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
594            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
595            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
596            SqlCommand::CreateMigration(q) => {
597                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
598            }
599            SqlCommand::ApplyMigration(q) => {
600                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
601            }
602            SqlCommand::RollbackMigration(q) => {
603                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
604            }
605            SqlCommand::ExplainMigration(q) => {
606                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
607            }
608        }
609    }
610}
611
612impl<'a> Parser<'a> {
613    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
614        self.expect_ident()?; // EVENTS
615        if self.consume_ident_ci("STATUS")? {
616            let mut query = TableQuery::new("red.subscriptions");
617            let collection = match self.peek().clone() {
618                Token::Ident(name) => {
619                    self.advance()?;
620                    Some(name)
621                }
622                Token::String(name) => {
623                    self.advance()?;
624                    Some(name)
625                }
626                _ => None,
627            };
628            self.parse_table_clauses(&mut query)?;
629            if let Some(collection) = collection {
630                let filter = Filter::compare(
631                    FieldRef::column("red.subscriptions", "collection"),
632                    CompareOp::Eq,
633                    Value::text(collection),
634                );
635                let expr = filter_to_expr(&filter);
636                query.where_expr = Some(match query.where_expr.take() {
637                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
638                    None => expr,
639                });
640                query.filter = Some(match query.filter.take() {
641                    Some(existing) => existing.and(filter),
642                    None => filter,
643                });
644            }
645            return Ok(QueryExpr::Table(query));
646        }
647
648        if !self.consume_ident_ci("BACKFILL")? {
649            return Err(ParseError::expected(
650                vec!["BACKFILL", "STATUS"],
651                self.peek(),
652                self.position(),
653            ));
654        }
655
656        if self.consume_ident_ci("STATUS")? {
657            let collection = self.expect_ident()?;
658            return Ok(QueryExpr::EventsBackfillStatus { collection });
659        }
660
661        let collection = self.expect_ident()?;
662        let where_filter = if self.consume(&Token::Where)? {
663            let mut parts = Vec::new();
664            while !self.check(&Token::Eof) && !self.check(&Token::To) {
665                parts.push(self.peek().to_string());
666                self.advance()?;
667            }
668            if parts.is_empty() {
669                return Err(ParseError::expected(
670                    vec!["predicate"],
671                    self.peek(),
672                    self.position(),
673                ));
674            }
675            Some(parts.join(" "))
676        } else {
677            None
678        };
679
680        self.expect(Token::To)?;
681        let target_queue = self.expect_ident()?;
682        let limit = if self.consume(&Token::Limit)? {
683            Some(self.parse_positive_integer("LIMIT")? as u64)
684        } else {
685            None
686        };
687
688        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
689            collection,
690            where_filter,
691            target_queue,
692            limit,
693        }))
694    }
695
696    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
697    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
698    /// clause is absent. Values are always single-quoted string literals —
699    /// consistent with PG's generic-options model.
700    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
701        if !self.consume(&Token::Options)? {
702            return Ok(Vec::new());
703        }
704        self.expect(Token::LParen)?;
705        let mut out: Vec<(String, String)> = Vec::new();
706        loop {
707            // Option keys frequently collide with reserved words
708            // (`path`, `format`, `delimiter`, `header`, …) — accept
709            // the keyword form and lowercase it so downstream
710            // option-name matching stays case-insensitive.
711            let was_ident = matches!(self.peek(), Token::Ident(_));
712            let raw = self.expect_ident_or_keyword()?;
713            let key = if was_ident {
714                raw
715            } else {
716                raw.to_ascii_lowercase()
717            };
718            // Value is a single-quoted string literal.
719            let value = self.parse_string()?;
720            out.push((key, value));
721            if !self.consume(&Token::Comma)? {
722                break;
723            }
724        }
725        self.expect(Token::RParen)?;
726        Ok(out)
727    }
728
729    /// Parse any top-level frontend statement through a single shared surface.
730    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
731        match self.peek() {
732            Token::Select => match self.parse_select_query()? {
733                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
734                    SqlQuery::Select(query),
735                ))),
736                QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
737                    SqlQuery::Join(query),
738                ))),
739                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
740                other => Err(ParseError::new(
741                    format!("internal: SELECT produced unexpected query kind {other:?}"),
742                    self.position(),
743                )),
744            },
745            Token::From
746            | Token::Insert
747            | Token::Update
748            | Token::Truncate
749            | Token::Create
750            | Token::Drop
751            | Token::Alter
752            | Token::Set
753            | Token::Begin
754            | Token::Commit
755            | Token::Rollback
756            | Token::Savepoint
757            | Token::Release
758            | Token::Start
759            | Token::Vacuum
760            | Token::Analyze
761            | Token::Copy
762            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
763            Token::Explain => {
764                if matches!(
765                    self.peek_next()?,
766                    Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
767                ) {
768                    match self.parse_explain_ask_query()? {
769                        QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
770                        other => Err(ParseError::new(
771                            format!(
772                                "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
773                            ),
774                            self.position(),
775                        )),
776                    }
777                } else {
778                    self.parse_sql_statement().map(FrontendStatement::Sql)
779                }
780            }
781            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
782                self.parse_sql_statement().map(FrontendStatement::Sql)
783            }
784            Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
785            Token::Ident(name)
786                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
787            {
788                self.parse_sql_statement().map(FrontendStatement::Sql)
789            }
790            Token::Ident(name)
791                if name.eq_ignore_ascii_case("GRANT")
792                    || name.eq_ignore_ascii_case("REVOKE")
793                    || name.eq_ignore_ascii_case("SIMULATE")
794                    || name.eq_ignore_ascii_case("APPLY") =>
795            {
796                self.parse_sql_statement().map(FrontendStatement::Sql)
797            }
798            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
799                self.advance()?;
800                if matches!(
801                    self.peek(),
802                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
803                ) {
804                    match self.parse_config_watch_after_watch()? {
805                        QueryExpr::ConfigCommand(command) => {
806                            Ok(FrontendStatement::ConfigCommand(command))
807                        }
808                        other => Err(ParseError::new(
809                            format!(
810                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
811                            ),
812                            self.position(),
813                        )),
814                    }
815                } else if matches!(
816                    self.peek(),
817                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
818                ) {
819                    match self.parse_vault_watch_after_watch()? {
820                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
821                        other => Err(ParseError::new(
822                            format!(
823                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
824                            ),
825                            self.position(),
826                        )),
827                    }
828                } else {
829                    match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
830                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
831                        other => Err(ParseError::new(
832                            format!("internal: WATCH produced unexpected query kind {other:?}"),
833                            self.position(),
834                        )),
835                    }
836                }
837            }
838            Token::List => {
839                self.advance()?;
840                if matches!(
841                    self.peek(),
842                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
843                ) {
844                    match self.parse_config_list_after_list()? {
845                        QueryExpr::ConfigCommand(command) => {
846                            Ok(FrontendStatement::ConfigCommand(command))
847                        }
848                        other => Err(ParseError::new(
849                            format!(
850                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
851                            ),
852                            self.position(),
853                        )),
854                    }
855                } else if matches!(
856                    self.peek(),
857                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
858                ) {
859                    match self.parse_vault_list_after_list()? {
860                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
861                        other => Err(ParseError::new(
862                            format!(
863                                "internal: LIST VAULT produced unexpected query kind {other:?}"
864                            ),
865                            self.position(),
866                        )),
867                    }
868                } else {
869                    Err(ParseError::expected(
870                        vec!["CONFIG", "VAULT"],
871                        self.peek(),
872                        self.position(),
873                    ))
874                }
875            }
876            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
877                self.advance()?;
878                if matches!(
879                    self.peek(),
880                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
881                ) {
882                    match self.parse_config_list_after_list()? {
883                        QueryExpr::ConfigCommand(command) => {
884                            Ok(FrontendStatement::ConfigCommand(command))
885                        }
886                        other => Err(ParseError::new(
887                            format!(
888                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
889                            ),
890                            self.position(),
891                        )),
892                    }
893                } else if matches!(
894                    self.peek(),
895                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
896                ) {
897                    match self.parse_vault_list_after_list()? {
898                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
899                        other => Err(ParseError::new(
900                            format!(
901                                "internal: LIST VAULT produced unexpected query kind {other:?}"
902                            ),
903                            self.position(),
904                        )),
905                    }
906                } else {
907                    Err(ParseError::expected(
908                        vec!["CONFIG", "VAULT"],
909                        self.peek(),
910                        self.position(),
911                    ))
912                }
913            }
914            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
915                if matches!(
916                    self.peek_next()?,
917                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
918                ) {
919                    match self.parse_config_command()? {
920                        QueryExpr::ConfigCommand(command) => {
921                            Ok(FrontendStatement::ConfigCommand(command))
922                        }
923                        other => Err(ParseError::new(
924                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
925                            self.position(),
926                        )),
927                    }
928                } else {
929                    self.advance()?;
930                    match self.parse_kv_invalidate_tags_after_invalidate()? {
931                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
932                        other => Err(ParseError::new(
933                            format!(
934                                "internal: INVALIDATE produced unexpected query kind {other:?}"
935                            ),
936                            self.position(),
937                        )),
938                    }
939                }
940            }
941            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
942            Token::Match => match self.parse_match_query()? {
943                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
944                other => Err(ParseError::new(
945                    format!("internal: MATCH produced unexpected query kind {other:?}"),
946                    self.position(),
947                )),
948            },
949            Token::Path => match self.parse_path_query()? {
950                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
951                other => Err(ParseError::new(
952                    format!("internal: PATH produced unexpected query kind {other:?}"),
953                    self.position(),
954                )),
955            },
956            Token::Vector => match self.parse_vector_query()? {
957                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
958                other => Err(ParseError::new(
959                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
960                    self.position(),
961                )),
962            },
963            Token::Hybrid => match self.parse_hybrid_query()? {
964                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
965                other => Err(ParseError::new(
966                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
967                    self.position(),
968                )),
969            },
970            Token::Graph => match self.parse_graph_command()? {
971                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
972                other => Err(ParseError::new(
973                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
974                    self.position(),
975                )),
976            },
977            Token::Search => match self.parse_search_command()? {
978                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
979                other => Err(ParseError::new(
980                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
981                    self.position(),
982                )),
983            },
984            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
985                match self.parse_ask_query()? {
986                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
987                    other => Err(ParseError::new(
988                        format!("internal: ASK produced unexpected query kind {other:?}"),
989                        self.position(),
990                    )),
991                }
992            }
993            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
994                match self.parse_unseal_vault_command()? {
995                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
996                    other => Err(ParseError::new(
997                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
998                        self.position(),
999                    )),
1000                }
1001            }
1002            Token::Queue => match self.parse_queue_command()? {
1003                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
1004                other => Err(ParseError::new(
1005                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
1006                    self.position(),
1007                )),
1008            },
1009            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1010                match self.parse_events_command()? {
1011                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1012                        SqlQuery::Select(query),
1013                    ))),
1014                    QueryExpr::EventsBackfill(query) => {
1015                        Ok(FrontendStatement::EventsBackfill(query))
1016                    }
1017                    QueryExpr::EventsBackfillStatus { collection } => {
1018                        Ok(FrontendStatement::EventsBackfillStatus { collection })
1019                    }
1020                    other => Err(ParseError::new(
1021                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
1022                        self.position(),
1023                    )),
1024                }
1025            }
1026            Token::Kv => match self.parse_kv_command()? {
1027                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1028                other => Err(ParseError::new(
1029                    format!("internal: KV produced unexpected query kind {other:?}"),
1030                    self.position(),
1031                )),
1032            },
1033            Token::Delete => {
1034                if matches!(
1035                    self.peek_next()?,
1036                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1037                ) {
1038                    match self.parse_config_command()? {
1039                        QueryExpr::ConfigCommand(command) => {
1040                            Ok(FrontendStatement::ConfigCommand(command))
1041                        }
1042                        other => Err(ParseError::new(
1043                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1044                            self.position(),
1045                        )),
1046                    }
1047                } else if matches!(
1048                    self.peek_next()?,
1049                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1050                ) {
1051                    match self.parse_vault_lifecycle_command()? {
1052                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1053                        other => Err(ParseError::new(
1054                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1055                            self.position(),
1056                        )),
1057                    }
1058                } else {
1059                    self.parse_sql_statement().map(FrontendStatement::Sql)
1060                }
1061            }
1062            Token::Add => match self.parse_config_command()? {
1063                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1064                other => Err(ParseError::new(
1065                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
1066                    self.position(),
1067                )),
1068            },
1069            Token::Purge => match self.parse_vault_lifecycle_command()? {
1070                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1071                other => Err(ParseError::new(
1072                    format!("internal: VAULT produced unexpected query kind {other:?}"),
1073                    self.position(),
1074                )),
1075            },
1076            Token::Ident(name)
1077                if name.eq_ignore_ascii_case("PUT")
1078                    || name.eq_ignore_ascii_case("GET")
1079                    || name.eq_ignore_ascii_case("RESOLVE")
1080                    || name.eq_ignore_ascii_case("ROTATE")
1081                    || name.eq_ignore_ascii_case("HISTORY")
1082                    || name.eq_ignore_ascii_case("PURGE")
1083                    || name.eq_ignore_ascii_case("INCR")
1084                    || name.eq_ignore_ascii_case("DECR")
1085                    || name.eq_ignore_ascii_case("INVALIDATE") =>
1086            {
1087                if matches!(
1088                    self.peek_next()?,
1089                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1090                ) {
1091                    match self.parse_vault_lifecycle_command()? {
1092                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1093                        other => Err(ParseError::new(
1094                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1095                            self.position(),
1096                        )),
1097                    }
1098                } else {
1099                    match self.parse_config_command()? {
1100                        QueryExpr::ConfigCommand(command) => {
1101                            Ok(FrontendStatement::ConfigCommand(command))
1102                        }
1103                        other => Err(ParseError::new(
1104                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1105                            self.position(),
1106                        )),
1107                    }
1108                }
1109            }
1110            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1111                match self.parse_vault_command()? {
1112                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1113                    other => Err(ParseError::new(
1114                        format!("internal: VAULT produced unexpected query kind {other:?}"),
1115                        self.position(),
1116                    )),
1117                }
1118            }
1119            Token::Tree => match self.parse_tree_command()? {
1120                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1121                other => Err(ParseError::new(
1122                    format!("internal: TREE produced unexpected query kind {other:?}"),
1123                    self.position(),
1124                )),
1125            },
1126            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1127                match self.parse_hll_command()? {
1128                    QueryExpr::ProbabilisticCommand(command) => {
1129                        Ok(FrontendStatement::ProbabilisticCommand(command))
1130                    }
1131                    other => Err(ParseError::new(
1132                        format!("internal: HLL produced unexpected query kind {other:?}"),
1133                        self.position(),
1134                    )),
1135                }
1136            }
1137            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1138                match self.parse_sketch_command()? {
1139                    QueryExpr::ProbabilisticCommand(command) => {
1140                        Ok(FrontendStatement::ProbabilisticCommand(command))
1141                    }
1142                    other => Err(ParseError::new(
1143                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
1144                        self.position(),
1145                    )),
1146                }
1147            }
1148            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1149                match self.parse_filter_command()? {
1150                    QueryExpr::ProbabilisticCommand(command) => {
1151                        Ok(FrontendStatement::ProbabilisticCommand(command))
1152                    }
1153                    other => Err(ParseError::new(
1154                        format!("internal: FILTER produced unexpected query kind {other:?}"),
1155                        self.position(),
1156                    )),
1157                }
1158            }
1159            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1160                .parse_sql_command()
1161                .map(SqlCommand::into_statement)
1162                .map(FrontendStatement::Sql),
1163            other => Err(ParseError::expected(
1164                vec![
1165                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1166                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1167                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1168                    "DESCRIBE", "DESC",
1169                ],
1170                other,
1171                self.position(),
1172            )),
1173        }
1174    }
1175
1176    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
1177    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1178        self.parse_sql_command().map(SqlCommand::into_statement)
1179    }
1180
1181    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1182        let mut path = self.expect_ident()?;
1183        while self.consume(&Token::Dot)? {
1184            let next = self.expect_ident_or_keyword()?;
1185            path = format!("{path}.{next}");
1186        }
1187        Ok(if lowercase {
1188            path.to_ascii_lowercase()
1189        } else {
1190            path
1191        })
1192    }
1193
1194    /// Parse any SQL/RQL-style command through a single frontend module.
1195    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1196        match self.peek() {
1197            Token::Select => match self.parse_select_query()? {
1198                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1199                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1200                other => Err(ParseError::new(
1201                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1202                    self.position(),
1203                )),
1204            },
1205            Token::From => match self.parse_from_query()? {
1206                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1207                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1208                other => Err(ParseError::new(
1209                    format!("internal: FROM produced unexpected query kind {other:?}"),
1210                    self.position(),
1211                )),
1212            },
1213            Token::Insert => match self.parse_insert_query()? {
1214                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1215                other => Err(ParseError::new(
1216                    format!("internal: INSERT produced unexpected query kind {other:?}"),
1217                    self.position(),
1218                )),
1219            },
1220            Token::Update => match self.parse_update_query()? {
1221                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1222                other => Err(ParseError::new(
1223                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
1224                    self.position(),
1225                )),
1226            },
1227            Token::Delete => {
1228                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1229                {
1230                    self.advance()?; // DELETE
1231                    self.advance()?; // SECRET
1232                    let key = self.parse_dotted_admin_path(true)?;
1233                    Ok(SqlCommand::DeleteSecret { key })
1234                } else {
1235                    match self.parse_delete_query()? {
1236                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1237                        other => Err(ParseError::new(
1238                            format!("internal: DELETE produced unexpected query kind {other:?}"),
1239                            self.position(),
1240                        )),
1241                    }
1242                }
1243            }
1244            Token::Truncate => {
1245                self.advance()?;
1246                let model = if self.consume(&Token::Table)? {
1247                    Some(CollectionModel::Table)
1248                } else if self.consume(&Token::Graph)? {
1249                    Some(CollectionModel::Graph)
1250                } else if self.consume(&Token::Vector)? {
1251                    Some(CollectionModel::Vector)
1252                } else if self.consume(&Token::Document)? {
1253                    Some(CollectionModel::Document)
1254                } else if self.consume(&Token::Timeseries)? {
1255                    Some(CollectionModel::TimeSeries)
1256                } else if self.consume(&Token::Kv)? {
1257                    Some(CollectionModel::Kv)
1258                } else if self.consume(&Token::Queue)? {
1259                    Some(CollectionModel::Queue)
1260                } else if self.consume(&Token::Collection)? {
1261                    None
1262                } else {
1263                    return Err(ParseError::expected(
1264                        vec![
1265                            "TABLE",
1266                            "GRAPH",
1267                            "VECTOR",
1268                            "DOCUMENT",
1269                            "TIMESERIES",
1270                            "KV",
1271                            "QUEUE",
1272                            "COLLECTION",
1273                        ],
1274                        self.peek(),
1275                        self.position(),
1276                    ));
1277                };
1278                match self.parse_truncate_body(model)? {
1279                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1280                    other => Err(ParseError::new(
1281                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1282                        self.position(),
1283                    )),
1284                }
1285            }
1286            Token::Explain => {
1287                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
1288                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
1289                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1290                {
1291                    self.advance()?; // consume EXPLAIN
1292                    match self.parse_explain_migration_after_keyword()? {
1293                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
1294                        other => Err(ParseError::new(
1295                            format!(
1296                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
1297                            ),
1298                            self.position(),
1299                        )),
1300                    }
1301                } else {
1302                    match self.parse_explain_alter_query()? {
1303                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
1304                        other => Err(ParseError::new(
1305                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
1306                            self.position(),
1307                        )),
1308                    }
1309                }
1310            }
1311            Token::Create => {
1312                let pos = self.position();
1313                self.advance()?;
1314
1315                // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
1316                // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
1317                // don't collide with other CREATE variants (TABLE, INDEX, etc.).
1318                let mut or_replace = false;
1319                if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1320                    let _ = self.consume_ident_ci("REPLACE")?;
1321                    or_replace = true;
1322                }
1323                let materialized = self.consume(&Token::Materialized)?;
1324                if self.check(&Token::View) {
1325                    self.advance()?;
1326                    let if_not_exists = self.match_if_not_exists()?;
1327                    let name = self.expect_ident()?;
1328                    // Accept `AS` — the lexer promotes it to `Token::As`
1329                    // (keyword) but some paths still see it as an ident.
1330                    if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1331                        return Err(ParseError::expected(
1332                            vec!["AS"],
1333                            self.peek(),
1334                            self.position(),
1335                        ));
1336                    }
1337                    // Recursive parse of the body. Any QueryExpr that the
1338                    // rest of the grammar accepts is valid (Select, Join, etc.).
1339                    let body = self.parse_sql_command()?.into_query_expr();
1340                    return Ok(SqlCommand::CreateView(CreateViewQuery {
1341                        name,
1342                        query: Box::new(body),
1343                        materialized,
1344                        if_not_exists,
1345                        or_replace,
1346                    }));
1347                }
1348                // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
1349                // bail out — no other CREATE form accepts those modifiers.
1350                if or_replace || materialized {
1351                    return Err(ParseError::expected(
1352                        vec!["VIEW"],
1353                        self.peek(),
1354                        self.position(),
1355                    ));
1356                }
1357
1358                if self.check(&Token::Index) || self.check(&Token::Unique) {
1359                    match self.parse_create_index_query()? {
1360                        QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1361                        other => Err(ParseError::new(
1362                            format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1363                            self.position(),
1364                        )),
1365                    }
1366                } else if self.check(&Token::Table) {
1367                    self.expect(Token::Table)?;
1368                    match self.parse_create_table_body()? {
1369                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1370                        other => Err(ParseError::new(
1371                            format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1372                            self.position(),
1373                        )),
1374                    }
1375                } else if self.check(&Token::Graph) {
1376                    self.advance()?;
1377                    match self.parse_create_collection_model_body(CollectionModel::Graph)? {
1378                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1379                        other => Err(ParseError::new(
1380                            format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
1381                            self.position(),
1382                        )),
1383                    }
1384                } else if self.check(&Token::Document) {
1385                    self.advance()?;
1386                    match self.parse_create_collection_model_body(CollectionModel::Document)? {
1387                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1388                        other => Err(ParseError::new(
1389                            format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
1390                            self.position(),
1391                        )),
1392                    }
1393                } else if self.check(&Token::Vector) {
1394                    self.advance()?;
1395                    match self.parse_create_vector_body()? {
1396                        QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
1397                        other => Err(ParseError::new(
1398                            format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
1399                            self.position(),
1400                        )),
1401                    }
1402                } else if self.check(&Token::Collection) {
1403                    self.advance()?;
1404                    match self.parse_create_collection_body()? {
1405                        QueryExpr::CreateCollection(query) => {
1406                            Ok(SqlCommand::CreateCollection(query))
1407                        }
1408                        other => Err(ParseError::new(
1409                            format!(
1410                                "internal: CREATE COLLECTION produced unexpected kind {other:?}"
1411                            ),
1412                            self.position(),
1413                        )),
1414                    }
1415                } else if self.check(&Token::Kv) {
1416                    self.advance()?;
1417                    match self.parse_create_keyed_body(CollectionModel::Kv)? {
1418                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1419                        other => Err(ParseError::new(
1420                            format!("internal: CREATE KV produced unexpected kind {other:?}"),
1421                            self.position(),
1422                        )),
1423                    }
1424                } else if self.consume_ident_ci("CONFIG")? {
1425                    match self.parse_create_keyed_body(CollectionModel::Config)? {
1426                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1427                        other => Err(ParseError::new(
1428                            format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1429                            self.position(),
1430                        )),
1431                    }
1432                } else if self.consume_ident_ci("VAULT")? {
1433                    match self.parse_create_keyed_body(CollectionModel::Vault)? {
1434                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1435                        other => Err(ParseError::new(
1436                            format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1437                            self.position(),
1438                        )),
1439                    }
1440                } else if self.check(&Token::Timeseries) {
1441                    self.advance()?;
1442                    match self.parse_create_timeseries_body()? {
1443                        QueryExpr::CreateTimeSeries(query) => {
1444                            Ok(SqlCommand::CreateTimeSeries(query))
1445                        }
1446                        other => Err(ParseError::new(
1447                            format!(
1448                                "internal: CREATE TIMESERIES produced unexpected kind {other:?}"
1449                            ),
1450                            self.position(),
1451                        )),
1452                    }
1453                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1454                {
1455                    self.advance()?;
1456                    match self.parse_create_hypertable_body()? {
1457                        QueryExpr::CreateTimeSeries(query) => {
1458                            Ok(SqlCommand::CreateTimeSeries(query))
1459                        }
1460                        other => Err(ParseError::new(
1461                            format!(
1462                                "internal: CREATE HYPERTABLE produced unexpected kind {other:?}"
1463                            ),
1464                            self.position(),
1465                        )),
1466                    }
1467                } else if self.check(&Token::Queue) {
1468                    self.advance()?;
1469                    match self.parse_create_queue_body()? {
1470                        QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1471                        other => Err(ParseError::new(
1472                            format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1473                            self.position(),
1474                        )),
1475                    }
1476                } else if self.check(&Token::Tree) {
1477                    self.advance()?;
1478                    match self.parse_create_tree_body()? {
1479                        QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1480                        other => Err(ParseError::new(
1481                            format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1482                            self.position(),
1483                        )),
1484                    }
1485                } else if matches!(self.peek(), Token::Ident(n) if
1486                    n.eq_ignore_ascii_case("HLL") ||
1487                    n.eq_ignore_ascii_case("SKETCH") ||
1488                    n.eq_ignore_ascii_case("FILTER"))
1489                {
1490                    match self.parse_create_probabilistic()? {
1491                        QueryExpr::ProbabilisticCommand(command) => {
1492                            Ok(SqlCommand::Probabilistic(command))
1493                        }
1494                        other => Err(ParseError::new(
1495                            format!(
1496                                "internal: CREATE probabilistic produced unexpected kind {other:?}"
1497                            ),
1498                            self.position(),
1499                        )),
1500                    }
1501                } else if self.check(&Token::Schema) {
1502                    // CREATE SCHEMA [IF NOT EXISTS] name
1503                    self.advance()?;
1504                    let if_not_exists = self.match_if_not_exists()?;
1505                    let name = self.expect_ident()?;
1506                    Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1507                        name,
1508                        if_not_exists,
1509                    }))
1510                } else if self.check(&Token::Policy) {
1511                    // Two forms share the leading `CREATE POLICY` tokens:
1512                    //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
1513                    //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
1514                    // Disambiguate by peeking the token after POLICY.
1515                    self.advance()?;
1516                    if matches!(self.peek(), Token::String(_)) {
1517                        // IAM form — short-circuit out of the SQL command stack.
1518                        let expr = self.parse_create_iam_policy_after_keywords()?;
1519                        // Inline command-wrapping: produce a synthetic SqlCommand by
1520                        // routing through a generic IAM admin holder. We don't
1521                        // have a dedicated SqlCommand variant for IAM yet, so we
1522                        // bounce through the existing Grant-shaped Admin slot
1523                        // which expects no further tokens.
1524                        return Ok(SqlCommand::IamPolicy(expr));
1525                    }
1526                    let name = self.expect_ident()?;
1527                    self.expect(Token::On)?;
1528
1529                    let (target_kind, table) = {
1530                        use crate::storage::query::ast::PolicyTargetKind;
1531                        let kw = match self.peek() {
1532                            Token::Ident(s) => Some(s.to_ascii_uppercase()),
1533                            _ => None,
1534                        };
1535                        let kind = kw.as_deref().and_then(|k| match k {
1536                            "NODES" => Some(PolicyTargetKind::Nodes),
1537                            "EDGES" => Some(PolicyTargetKind::Edges),
1538                            "VECTORS" => Some(PolicyTargetKind::Vectors),
1539                            "MESSAGES" => Some(PolicyTargetKind::Messages),
1540                            "POINTS" => Some(PolicyTargetKind::Points),
1541                            "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1542                            _ => None,
1543                        });
1544                        if let Some(k) = kind {
1545                            self.advance()?;
1546                            self.expect(Token::Of)?;
1547                            let coll = self.expect_ident()?;
1548                            (k, coll)
1549                        } else {
1550                            let coll = self.expect_ident()?;
1551                            (PolicyTargetKind::Table, coll)
1552                        }
1553                    };
1554
1555                    let action = if self.consume(&Token::For)? {
1556                        let a = match self.peek() {
1557                            Token::Select => {
1558                                self.advance()?;
1559                                Some(PolicyAction::Select)
1560                            }
1561                            Token::Insert => {
1562                                self.advance()?;
1563                                Some(PolicyAction::Insert)
1564                            }
1565                            Token::Update => {
1566                                self.advance()?;
1567                                Some(PolicyAction::Update)
1568                            }
1569                            Token::Delete => {
1570                                self.advance()?;
1571                                Some(PolicyAction::Delete)
1572                            }
1573                            Token::All => {
1574                                self.advance()?;
1575                                None
1576                            }
1577                            _ => None,
1578                        };
1579                        a
1580                    } else {
1581                        None
1582                    };
1583
1584                    let role = if self.consume(&Token::To)? {
1585                        Some(self.expect_ident()?)
1586                    } else {
1587                        None
1588                    };
1589
1590                    self.expect(Token::Using)?;
1591                    self.expect(Token::LParen)?;
1592                    let filter = self.parse_filter()?;
1593                    self.expect(Token::RParen)?;
1594
1595                    Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1596                        name,
1597                        table,
1598                        action,
1599                        role,
1600                        using: Box::new(filter),
1601                        target_kind,
1602                    }))
1603                } else if self.check(&Token::Server) {
1604                    // CREATE SERVER [IF NOT EXISTS] name
1605                    //   FOREIGN DATA WRAPPER kind
1606                    //   [OPTIONS (key 'value', ...)]
1607                    self.advance()?;
1608                    let if_not_exists = self.match_if_not_exists()?;
1609                    let name = self.expect_ident()?;
1610                    self.expect(Token::Foreign)?;
1611                    self.expect(Token::Data)?;
1612                    self.expect(Token::Wrapper)?;
1613                    let wrapper = self.expect_ident()?;
1614                    let options = self.parse_fdw_options_clause()?;
1615                    Ok(SqlCommand::CreateServer(CreateServerQuery {
1616                        name,
1617                        wrapper,
1618                        options,
1619                        if_not_exists,
1620                    }))
1621                } else if self.check(&Token::Foreign) {
1622                    // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
1623                    //   SERVER server_name
1624                    //   [OPTIONS (key 'value', ...)]
1625                    self.advance()?;
1626                    self.expect(Token::Table)?;
1627                    let if_not_exists = self.match_if_not_exists()?;
1628                    let name = self.expect_ident()?;
1629                    self.expect(Token::LParen)?;
1630                    let mut columns = Vec::new();
1631                    loop {
1632                        let col_name = self.expect_ident()?;
1633                        let data_type = self.expect_ident_or_keyword()?;
1634                        // Inline NOT NULL check — the CREATE TABLE path's helper is
1635                        // private and coupling to it just for FDW columns isn't worth it.
1636                        let mut not_null = false;
1637                        if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1638                            self.advance()?;
1639                            if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL"))
1640                            {
1641                                self.advance()?;
1642                                not_null = true;
1643                            }
1644                        }
1645                        columns.push(ForeignColumnDef {
1646                            name: col_name,
1647                            data_type,
1648                            not_null,
1649                        });
1650                        if !self.consume(&Token::Comma)? {
1651                            break;
1652                        }
1653                    }
1654                    self.expect(Token::RParen)?;
1655                    self.expect(Token::Server)?;
1656                    let server = self.expect_ident()?;
1657                    let options = self.parse_fdw_options_clause()?;
1658                    Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1659                        name,
1660                        server,
1661                        columns,
1662                        options,
1663                        if_not_exists,
1664                    }))
1665                } else if self.check(&Token::Sequence) {
1666                    // CREATE SEQUENCE [IF NOT EXISTS] name
1667                    //   [START [WITH] n] [INCREMENT [BY] n]
1668                    self.advance()?;
1669                    let if_not_exists = self.match_if_not_exists()?;
1670                    let name = self.expect_ident()?;
1671                    let mut start: i64 = 1;
1672                    let mut increment: i64 = 1;
1673                    // Loop over optional clauses in any order.
1674                    loop {
1675                        if self.consume(&Token::Start)? {
1676                            // Accept `START 100` or `START WITH 100`.
1677                            let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1678                            start = self.parse_integer()?;
1679                        } else if self.consume(&Token::Increment)? {
1680                            // Accept `INCREMENT 5` or `INCREMENT BY 5`.
1681                            let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1682                            increment = self.parse_integer()?;
1683                        } else {
1684                            break;
1685                        }
1686                    }
1687                    Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1688                        name,
1689                        if_not_exists,
1690                        start,
1691                        increment,
1692                    }))
1693                } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1694                {
1695                    self.advance()?; // consume MIGRATION
1696                    match self.parse_create_migration_body()? {
1697                        QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1698                        other => Err(ParseError::new(
1699                            format!(
1700                                "internal: CREATE MIGRATION produced unexpected kind {other:?}"
1701                            ),
1702                            self.position(),
1703                        )),
1704                    }
1705                } else if let Some(err) =
1706                    ParseError::unsupported_recognized_token(self.peek(), self.position())
1707                {
1708                    Err(err)
1709                } else {
1710                    Err(ParseError::expected(
1711                        vec![
1712                            "TABLE",
1713                            "GRAPH",
1714                            "VECTOR",
1715                            "DOCUMENT",
1716                            "KV",
1717                            "COLLECTION",
1718                            "INDEX",
1719                            "UNIQUE",
1720                            "TIMESERIES",
1721                            "QUEUE",
1722                            "TREE",
1723                            "HLL",
1724                            "SKETCH",
1725                            "FILTER",
1726                            "SCHEMA",
1727                            "SEQUENCE",
1728                            "MIGRATION",
1729                        ],
1730                        self.peek(),
1731                        pos,
1732                    ))
1733                }
1734            }
1735            Token::Drop => {
1736                let pos = self.position();
1737                self.advance()?;
1738
1739                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
1740                let materialized = self.consume(&Token::Materialized)?;
1741                if self.check(&Token::View) {
1742                    self.advance()?;
1743                    let if_exists = self.match_if_exists()?;
1744                    let name = self.expect_ident()?;
1745                    return Ok(SqlCommand::DropView(DropViewQuery {
1746                        name,
1747                        materialized,
1748                        if_exists,
1749                    }));
1750                }
1751                if materialized {
1752                    return Err(ParseError::expected(
1753                        vec!["VIEW"],
1754                        self.peek(),
1755                        self.position(),
1756                    ));
1757                }
1758
1759                if self.check(&Token::Index) {
1760                    match self.parse_drop_index_query()? {
1761                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
1762                        other => Err(ParseError::new(
1763                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
1764                            self.position(),
1765                        )),
1766                    }
1767                } else if self.check(&Token::Table) {
1768                    self.expect(Token::Table)?;
1769                    match self.parse_drop_table_body()? {
1770                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
1771                        other => Err(ParseError::new(
1772                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
1773                            self.position(),
1774                        )),
1775                    }
1776                } else if self.check(&Token::Graph) {
1777                    self.advance()?;
1778                    match self.parse_drop_graph_body()? {
1779                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
1780                        other => Err(ParseError::new(
1781                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
1782                            self.position(),
1783                        )),
1784                    }
1785                } else if self.check(&Token::Vector) {
1786                    self.advance()?;
1787                    match self.parse_drop_vector_body()? {
1788                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
1789                        other => Err(ParseError::new(
1790                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
1791                            self.position(),
1792                        )),
1793                    }
1794                } else if self.check(&Token::Document) {
1795                    self.advance()?;
1796                    match self.parse_drop_document_body()? {
1797                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
1798                        other => Err(ParseError::new(
1799                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
1800                            self.position(),
1801                        )),
1802                    }
1803                } else if self.check(&Token::Kv) {
1804                    self.advance()?;
1805                    match self.parse_drop_kv_body()? {
1806                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1807                        other => Err(ParseError::new(
1808                            format!("internal: DROP KV produced unexpected kind {other:?}"),
1809                            self.position(),
1810                        )),
1811                    }
1812                } else if self.consume_ident_ci("CONFIG")? {
1813                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
1814                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1815                        other => Err(ParseError::new(
1816                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
1817                            self.position(),
1818                        )),
1819                    }
1820                } else if self.consume_ident_ci("VAULT")? {
1821                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
1822                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1823                        other => Err(ParseError::new(
1824                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
1825                            self.position(),
1826                        )),
1827                    }
1828                } else if self.check(&Token::Collection) {
1829                    self.advance()?;
1830                    match self.parse_drop_collection_body()? {
1831                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1832                        other => Err(ParseError::new(
1833                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
1834                            self.position(),
1835                        )),
1836                    }
1837                } else if self.check(&Token::Timeseries) {
1838                    self.advance()?;
1839                    match self.parse_drop_timeseries_body()? {
1840                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1841                        other => Err(ParseError::new(
1842                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
1843                            self.position(),
1844                        )),
1845                    }
1846                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1847                {
1848                    // DROP HYPERTABLE name reuses the same AST as
1849                    // DROP TIMESERIES — runtime clears the registry
1850                    // entry *and* drops the backing collection.
1851                    self.advance()?;
1852                    match self.parse_drop_timeseries_body()? {
1853                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1854                        other => Err(ParseError::new(
1855                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
1856                            self.position(),
1857                        )),
1858                    }
1859                } else if self.check(&Token::Queue) {
1860                    self.advance()?;
1861                    match self.parse_drop_queue_body()? {
1862                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
1863                        other => Err(ParseError::new(
1864                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
1865                            self.position(),
1866                        )),
1867                    }
1868                } else if self.check(&Token::Tree) {
1869                    self.advance()?;
1870                    match self.parse_drop_tree_body()? {
1871                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
1872                        other => Err(ParseError::new(
1873                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
1874                            self.position(),
1875                        )),
1876                    }
1877                } else if matches!(self.peek(), Token::Ident(n) if
1878                    n.eq_ignore_ascii_case("HLL") ||
1879                    n.eq_ignore_ascii_case("SKETCH") ||
1880                    n.eq_ignore_ascii_case("FILTER"))
1881                {
1882                    match self.parse_drop_probabilistic()? {
1883                        QueryExpr::ProbabilisticCommand(command) => {
1884                            Ok(SqlCommand::Probabilistic(command))
1885                        }
1886                        other => Err(ParseError::new(
1887                            format!(
1888                                "internal: DROP probabilistic produced unexpected kind {other:?}"
1889                            ),
1890                            self.position(),
1891                        )),
1892                    }
1893                } else if self.check(&Token::Schema) {
1894                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
1895                    self.advance()?;
1896                    let if_exists = self.match_if_exists()?;
1897                    let name = self.expect_ident()?;
1898                    let cascade = self.consume(&Token::Cascade)?;
1899                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
1900                        name,
1901                        if_exists,
1902                        cascade,
1903                    }))
1904                } else if self.check(&Token::Policy) {
1905                    // Two forms:
1906                    //   * IAM:   DROP POLICY '<id>'
1907                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
1908                    self.advance()?;
1909                    if matches!(self.peek(), Token::String(_)) {
1910                        let expr = self.parse_drop_iam_policy_after_keywords()?;
1911                        return Ok(SqlCommand::IamPolicy(expr));
1912                    }
1913                    let if_exists = self.match_if_exists()?;
1914                    let name = self.expect_ident()?;
1915                    self.expect(Token::On)?;
1916                    let table = self.expect_ident()?;
1917                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
1918                        name,
1919                        table,
1920                        if_exists,
1921                    }))
1922                } else if self.check(&Token::Server) {
1923                    // DROP SERVER [IF EXISTS] name [CASCADE]
1924                    self.advance()?;
1925                    let if_exists = self.match_if_exists()?;
1926                    let name = self.expect_ident()?;
1927                    let cascade = self.consume(&Token::Cascade)?;
1928                    Ok(SqlCommand::DropServer(DropServerQuery {
1929                        name,
1930                        if_exists,
1931                        cascade,
1932                    }))
1933                } else if self.check(&Token::Foreign) {
1934                    // DROP FOREIGN TABLE [IF EXISTS] name
1935                    self.advance()?;
1936                    self.expect(Token::Table)?;
1937                    let if_exists = self.match_if_exists()?;
1938                    let name = self.expect_ident()?;
1939                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
1940                        name,
1941                        if_exists,
1942                    }))
1943                } else if self.check(&Token::Sequence) {
1944                    // DROP SEQUENCE [IF EXISTS] name
1945                    self.advance()?;
1946                    let if_exists = self.match_if_exists()?;
1947                    let name = self.expect_ident()?;
1948                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
1949                        name,
1950                        if_exists,
1951                    }))
1952                } else if let Some(err) =
1953                    ParseError::unsupported_recognized_token(self.peek(), self.position())
1954                {
1955                    Err(err)
1956                } else {
1957                    Err(ParseError::expected(
1958                        vec![
1959                            "TABLE",
1960                            "INDEX",
1961                            "TIMESERIES",
1962                            "QUEUE",
1963                            "TREE",
1964                            "HLL",
1965                            "SKETCH",
1966                            "FILTER",
1967                            "SCHEMA",
1968                            "SEQUENCE",
1969                        ],
1970                        self.peek(),
1971                        pos,
1972                    ))
1973                }
1974            }
1975            Token::Alter => {
1976                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
1977                // committing to a path until we've seen the target.
1978                // We peek the *next* token (without consuming) and
1979                // dispatch accordingly.
1980                let next = self.peek_next()?.clone();
1981                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
1982                    self.advance()?; // consume ALTER
1983                    let stmt = self.parse_alter_user_statement()?;
1984                    Ok(SqlCommand::AlterUser(stmt))
1985                } else if matches!(next, Token::Queue) {
1986                    self.advance()?; // consume ALTER
1987                    self.advance()?; // consume QUEUE
1988                    match self.parse_alter_queue_body()? {
1989                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
1990                        other => Err(ParseError::new(
1991                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
1992                            self.position(),
1993                        )),
1994                    }
1995                } else if matches!(next, Token::Table) {
1996                    match self.parse_alter_table_query()? {
1997                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
1998                        other => Err(ParseError::new(
1999                            format!(
2000                                "internal: ALTER TABLE produced unexpected query kind {other:?}"
2001                            ),
2002                            self.position(),
2003                        )),
2004                    }
2005                } else if let Some(err) =
2006                    ParseError::unsupported_recognized_token(&next, self.position())
2007                {
2008                    Err(err)
2009                } else {
2010                    match self.parse_alter_table_query()? {
2011                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2012                        other => Err(ParseError::new(
2013                            format!("internal: ALTER produced unexpected query kind {other:?}"),
2014                            self.position(),
2015                        )),
2016                    }
2017                }
2018            }
2019            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
2020                let stmt = self.parse_grant_statement()?;
2021                Ok(SqlCommand::Grant(stmt))
2022            }
2023            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
2024                let stmt = self.parse_revoke_statement()?;
2025                Ok(SqlCommand::Revoke(stmt))
2026            }
2027            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2028                self.advance()?;
2029                if self.consume_ident_ci("BACKFILL")? {
2030                    return Err(ParseError::new(
2031                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
2032                            .to_string(),
2033                        self.position(),
2034                    ));
2035                }
2036                if !self.consume_ident_ci("STATUS")? {
2037                    return Err(ParseError::expected(
2038                        vec!["STATUS"],
2039                        self.peek(),
2040                        self.position(),
2041                    ));
2042                }
2043
2044                let mut query = TableQuery::new("red.subscriptions");
2045                let collection = match self.peek().clone() {
2046                    Token::Ident(name) => {
2047                        self.advance()?;
2048                        Some(name)
2049                    }
2050                    Token::String(name) => {
2051                        self.advance()?;
2052                        Some(name)
2053                    }
2054                    _ => None,
2055                };
2056                self.parse_table_clauses(&mut query)?;
2057                if let Some(collection) = collection {
2058                    let filter = Filter::compare(
2059                        FieldRef::column("red.subscriptions", "collection"),
2060                        CompareOp::Eq,
2061                        Value::text(collection),
2062                    );
2063                    let expr = filter_to_expr(&filter);
2064                    query.where_expr = Some(match query.where_expr.take() {
2065                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
2066                        None => expr,
2067                    });
2068                    query.filter = Some(match query.filter.take() {
2069                        Some(existing) => existing.and(filter),
2070                        None => filter,
2071                    });
2072                }
2073                Ok(SqlCommand::Select(query))
2074            }
2075            Token::Attach => {
2076                let expr = self.parse_attach_policy()?;
2077                Ok(SqlCommand::IamPolicy(expr))
2078            }
2079            Token::Detach => {
2080                let expr = self.parse_detach_policy()?;
2081                Ok(SqlCommand::IamPolicy(expr))
2082            }
2083            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
2084                let expr = self.parse_simulate_policy()?;
2085                Ok(SqlCommand::IamPolicy(expr))
2086            }
2087            Token::Set => {
2088                self.advance()?;
2089                if self.consume_ident_ci("CONFIG")? {
2090                    let full_key = self.parse_dotted_admin_path(true)?;
2091                    self.expect(Token::Eq)?;
2092                    let value = self.parse_literal_value()?;
2093                    Ok(SqlCommand::SetConfig {
2094                        key: full_key,
2095                        value,
2096                    })
2097                } else if self.consume_ident_ci("SECRET")? {
2098                    let key = self.parse_dotted_admin_path(true)?;
2099                    self.expect(Token::Eq)?;
2100                    let value = self.parse_literal_value()?;
2101                    Ok(SqlCommand::SetSecret { key, value })
2102                } else if self.consume_ident_ci("TENANT")? {
2103                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
2104                    // SET TENANT NULL  |  SET TENANT = NULL
2105                    let _ = self.consume(&Token::Eq)?;
2106                    if self.consume_ident_ci("NULL")? {
2107                        Ok(SqlCommand::SetTenant(None))
2108                    } else {
2109                        let value = self.parse_literal_value()?;
2110                        match value {
2111                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2112                            Value::Null => Ok(SqlCommand::SetTenant(None)),
2113                            other => Err(ParseError::new(
2114                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2115                                self.position(),
2116                            )),
2117                        }
2118                    }
2119                } else {
2120                    Err(ParseError::expected(
2121                        vec!["CONFIG", "SECRET", "TENANT"],
2122                        self.peek(),
2123                        self.position(),
2124                    ))
2125                }
2126            }
2127            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2128                self.advance()?;
2129                match self.parse_apply_migration()? {
2130                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2131                    other => Err(ParseError::new(
2132                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2133                        self.position(),
2134                    )),
2135                }
2136            }
2137            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2138                // RESET TENANT — session-local clear
2139                self.advance()?;
2140                if self.consume_ident_ci("TENANT")? {
2141                    Ok(SqlCommand::SetTenant(None))
2142                } else {
2143                    Err(ParseError::expected(
2144                        vec!["TENANT"],
2145                        self.peek(),
2146                        self.position(),
2147                    ))
2148                }
2149            }
2150            Token::Ident(name)
2151                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
2152            {
2153                self.advance()?;
2154                let collection = self.parse_dotted_admin_path(false)?;
2155                let mut query = TableQuery::new("red.describe");
2156                query.filter = Some(Filter::compare(
2157                    FieldRef::column("", "collection"),
2158                    CompareOp::Eq,
2159                    Value::text(collection),
2160                ));
2161                Ok(SqlCommand::Select(query))
2162            }
2163            Token::Desc => {
2164                self.advance()?;
2165                let collection = self.parse_dotted_admin_path(false)?;
2166                let mut query = TableQuery::new("red.describe");
2167                query.filter = Some(Filter::compare(
2168                    FieldRef::column("", "collection"),
2169                    CompareOp::Eq,
2170                    Value::text(collection),
2171                ));
2172                Ok(SqlCommand::Select(query))
2173            }
2174            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2175                self.advance()?;
2176                if self.consume_ident_ci("CONFIG")? {
2177                    // Accept dotted prefixes the same way SET CONFIG does
2178                    // (`SHOW CONFIG durability.mode`), and empty prefix
2179                    // (`SHOW CONFIG`) for a catalog-wide listing.
2180                    let prefix = if !self.check(&Token::Eof) {
2181                        let first = self.expect_ident()?;
2182                        let mut full = first;
2183                        while self.consume(&Token::Dot)? {
2184                            let next = self.expect_ident_or_keyword()?;
2185                            full = format!("{full}.{next}");
2186                        }
2187                        // Match SET CONFIG: lowercase so keyword segments
2188                        // come out consistent with the stored keys.
2189                        Some(full.to_ascii_lowercase())
2190                    } else {
2191                        None
2192                    };
2193                    Ok(SqlCommand::ShowConfig { prefix })
2194                } else if self.consume_ident_ci("COLLECTIONS")? {
2195                    let mut query = TableQuery::new("red.collections");
2196                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2197                        if !self.consume_ident_ci("INTERNAL")? {
2198                            return Err(ParseError::expected(
2199                                vec!["INTERNAL"],
2200                                self.peek(),
2201                                self.position(),
2202                            ));
2203                        }
2204                        true
2205                    } else {
2206                        false
2207                    };
2208                    self.parse_table_clauses(&mut query)?;
2209                    if !include_internal {
2210                        let user_filter = query.filter.take();
2211                        let hide_internal = crate::storage::query::ast::Filter::Compare {
2212                            field: FieldRef::column("", "internal"),
2213                            op: CompareOp::Eq,
2214                            value: Value::Boolean(false),
2215                        };
2216                        query.filter = Some(match user_filter {
2217                            Some(filter) => filter.and(hide_internal),
2218                            None => hide_internal,
2219                        });
2220                    }
2221                    Ok(SqlCommand::Select(query))
2222                } else if self.consume_ident_ci("TABLES")? {
2223                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2224                        self, "table",
2225                    )?))
2226                } else if self.consume_ident_ci("QUEUES")? {
2227                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2228                        self, "queue",
2229                    )?))
2230                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2231                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2232                        self, "vector",
2233                    )?))
2234                } else if self.consume_ident_ci("DOCUMENTS")? {
2235                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2236                        self, "document",
2237                    )?))
2238                } else if self.consume(&Token::Timeseries)?
2239                    || self.consume_ident_ci("TIMESERIES")?
2240                {
2241                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2242                        self,
2243                        "timeseries",
2244                    )?))
2245                } else if self.consume_ident_ci("GRAPHS")? {
2246                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2247                        self, "graph",
2248                    )?))
2249                } else if self.consume_ident_ci("CONFIGS")? {
2250                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2251                        self, "config",
2252                    )?))
2253                } else if self.consume_ident_ci("VAULTS")? {
2254                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2255                        self, "vault",
2256                    )?))
2257                } else if self.consume(&Token::Kv)?
2258                    || self.consume_ident_ci("KV")?
2259                    || self.consume_ident_ci("KVS")?
2260                {
2261                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2262                        self, "kv",
2263                    )?))
2264                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2265                    let collection = self.parse_dotted_admin_path(false)?;
2266                    let mut query = TableQuery::new("red.columns");
2267                    query.filter = Some(Filter::compare(
2268                        FieldRef::column("", "collection"),
2269                        CompareOp::Eq,
2270                        Value::text(collection),
2271                    ));
2272                    Ok(SqlCommand::Select(query))
2273                } else if self.consume_ident_ci("INDICES")? {
2274                    let mut query = TableQuery::new("red.indices");
2275                    if self.consume(&Token::On)? {
2276                        let collection = self.expect_ident_or_keyword()?;
2277                        let filter = Filter::Compare {
2278                            field: FieldRef::column("red.indices", "collection"),
2279                            op: CompareOp::Eq,
2280                            value: Value::text(collection),
2281                        };
2282                        query.where_expr = Some(filter_to_expr(&filter));
2283                        query.filter = Some(filter);
2284                    }
2285                    self.parse_table_clauses(&mut query)?;
2286                    Ok(SqlCommand::Select(query))
2287                } else if self.consume_ident_ci("POLICIES")? {
2288                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2289                        let principal = self.parse_iam_principal_kind()?;
2290                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2291                            filter: Some(principal),
2292                        }));
2293                    }
2294                    let mut query = TableQuery::new("red.policies");
2295                    let collection_filter =
2296                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2297                            let collection = self.parse_dotted_admin_path(false)?;
2298                            Some(Filter::Compare {
2299                                field: FieldRef::TableColumn {
2300                                    table: String::new(),
2301                                    column: "collection".to_string(),
2302                                },
2303                                op: CompareOp::Eq,
2304                                value: Value::text(collection),
2305                            })
2306                        } else {
2307                            None
2308                        };
2309                    self.parse_table_clauses(&mut query)?;
2310                    if let Some(collection_filter) = collection_filter {
2311                        let combined = match query.filter.take() {
2312                            Some(existing) => {
2313                                Filter::And(Box::new(collection_filter), Box::new(existing))
2314                            }
2315                            None => collection_filter,
2316                        };
2317                        query.where_expr = Some(filter_to_expr(&combined));
2318                        query.filter = Some(combined);
2319                    }
2320                    Ok(SqlCommand::Select(query))
2321                } else if self.consume_ident_ci("STATS")? {
2322                    let mut query = TableQuery::new("red.stats");
2323                    let collection = match self.peek().clone() {
2324                        Token::Ident(name) => {
2325                            self.advance()?;
2326                            Some(name)
2327                        }
2328                        Token::String(name) => {
2329                            self.advance()?;
2330                            Some(name)
2331                        }
2332                        _ => None,
2333                    };
2334                    self.parse_table_clauses(&mut query)?;
2335                    if let Some(collection) = collection {
2336                        let filter = Filter::compare(
2337                            FieldRef::column("red.stats", "collection"),
2338                            CompareOp::Eq,
2339                            Value::text(collection),
2340                        );
2341                        let expr = filter_to_expr(&filter);
2342                        query.where_expr = Some(match query.where_expr.take() {
2343                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
2344                            None => expr,
2345                        });
2346                        query.filter = Some(match query.filter.take() {
2347                            Some(existing) => existing.and(filter),
2348                            None => filter,
2349                        });
2350                    }
2351                    Ok(SqlCommand::Select(query))
2352                } else if self.consume_ident_ci("SAMPLE")? {
2353                    let mut query = TableQuery::new(&self.expect_ident()?);
2354                    query.limit = if self.consume(&Token::Limit)? {
2355                        Some(self.parse_integer()? as u64)
2356                    } else {
2357                        Some(10)
2358                    };
2359                    Ok(SqlCommand::Select(query))
2360                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2361                    let prefix = if !self.check(&Token::Eof) {
2362                        Some(self.parse_dotted_admin_path(true)?)
2363                    } else {
2364                        None
2365                    };
2366                    Ok(SqlCommand::ShowSecrets { prefix })
2367                } else if self.consume_ident_ci("TENANT")? {
2368                    Ok(SqlCommand::ShowTenant)
2369                } else if let Some(expr) = self.parse_show_iam_after_show()? {
2370                    Ok(SqlCommand::IamPolicy(expr))
2371                } else {
2372                    Err(ParseError::expected(
2373                        vec![
2374                            "CONFIG",
2375                            "SECRET",
2376                            "SECRETS",
2377                            "COLLECTIONS",
2378                            "TABLES",
2379                            "QUEUES",
2380                            "VECTORS",
2381                            "DOCUMENTS",
2382                            "TIMESERIES",
2383                            "GRAPHS",
2384                            "KV",
2385                            "SCHEMA",
2386                            "INDICES",
2387                            "SAMPLE",
2388                            "POLICIES",
2389                            "STATS",
2390                            "TENANT",
2391                            "EFFECTIVE",
2392                        ],
2393                        self.peek(),
2394                        self.position(),
2395                    ))
2396                }
2397            }
2398            // Transaction control statements (Phase 1.1 PG parity).
2399            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
2400            // START TRANSACTION [ISOLATION LEVEL <mode>]
2401            //
2402            // We only implement SNAPSHOT ISOLATION (our default). We
2403            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2404            // READ / SNAPSHOT as PG-compatible no-ops, but reject
2405            // SERIALIZABLE outright — the previous behaviour of
2406            // silently degrading to snapshot made the parser
2407            // dishonest. Real SSI (Serializable Snapshot Isolation)
2408            // is tracked as a future milestone.
2409            Token::Begin | Token::Start => {
2410                self.advance()?;
2411                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2412                // Optional ISOLATION LEVEL clause.
2413                if self.consume_ident_ci("ISOLATION")? {
2414                    self.expect(Token::Level)?;
2415                    // The level identifier can span multiple words
2416                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2417                    // READ). Collect them case-insensitively.
2418                    let mut parts: Vec<String> = Vec::new();
2419                    if self.consume_ident_ci("READ")? {
2420                        parts.push("READ".to_string());
2421                        if self.consume_ident_ci("UNCOMMITTED")? {
2422                            parts.push("UNCOMMITTED".to_string());
2423                        } else if self.consume_ident_ci("COMMITTED")? {
2424                            parts.push("COMMITTED".to_string());
2425                        } else {
2426                            return Err(ParseError::expected(
2427                                vec!["UNCOMMITTED", "COMMITTED"],
2428                                self.peek(),
2429                                self.position(),
2430                            ));
2431                        }
2432                    } else if self.consume_ident_ci("REPEATABLE")? {
2433                        parts.push("REPEATABLE".to_string());
2434                        if !self.consume_ident_ci("READ")? {
2435                            return Err(ParseError::expected(
2436                                vec!["READ"],
2437                                self.peek(),
2438                                self.position(),
2439                            ));
2440                        }
2441                        parts.push("READ".to_string());
2442                    } else if self.consume_ident_ci("SNAPSHOT")? {
2443                        parts.push("SNAPSHOT".to_string());
2444                    } else if self.consume_ident_ci("SERIALIZABLE")? {
2445                        return Err(ParseError::new(
2446                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2447                             currently provides SNAPSHOT ISOLATION (which PG calls \
2448                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2449                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
2450                                .to_string(),
2451                            self.position(),
2452                        ));
2453                    } else {
2454                        return Err(ParseError::expected(
2455                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2456                            self.peek(),
2457                            self.position(),
2458                        ));
2459                    }
2460                    // All accepted modes map to our snapshot engine today.
2461                    let _ = parts;
2462                }
2463                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2464            }
2465            // COMMIT [WORK | TRANSACTION]
2466            Token::Commit => {
2467                self.advance()?;
2468                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2469                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2470            }
2471            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
2472            // ROLLBACK MIGRATION name
2473            Token::Rollback => {
2474                self.advance()?;
2475                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2476                    match self.parse_rollback_migration_after_keyword()? {
2477                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2478                        other => Err(ParseError::new(
2479                            format!(
2480                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2481                            ),
2482                            self.position(),
2483                        )),
2484                    }
2485                } else {
2486                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2487                    if self.consume(&Token::To)? {
2488                        let _ = self.consume(&Token::Savepoint)?;
2489                        let name = self.expect_ident()?;
2490                        Ok(SqlCommand::TransactionControl(
2491                            TxnControl::RollbackToSavepoint(name),
2492                        ))
2493                    } else {
2494                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2495                    }
2496                }
2497            }
2498            // SAVEPOINT name
2499            Token::Savepoint => {
2500                self.advance()?;
2501                let name = self.expect_ident()?;
2502                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2503            }
2504            // RELEASE [SAVEPOINT] name
2505            Token::Release => {
2506                self.advance()?;
2507                let _ = self.consume(&Token::Savepoint)?;
2508                let name = self.expect_ident()?;
2509                Ok(SqlCommand::TransactionControl(
2510                    TxnControl::ReleaseSavepoint(name),
2511                ))
2512            }
2513            // VACUUM [FULL] [table]
2514            Token::Vacuum => {
2515                self.advance()?;
2516                let full = self.consume(&Token::Full)?;
2517                let target = if self.check(&Token::Eof) {
2518                    None
2519                } else {
2520                    Some(self.expect_ident()?)
2521                };
2522                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2523                    target,
2524                    full,
2525                }))
2526            }
2527            // REFRESH MATERIALIZED VIEW name
2528            Token::Refresh => {
2529                self.advance()?;
2530                self.expect(Token::Materialized)?;
2531                self.expect(Token::View)?;
2532                let name = self.expect_ident()?;
2533                Ok(SqlCommand::RefreshMaterializedView(
2534                    RefreshMaterializedViewQuery { name },
2535                ))
2536            }
2537            // ANALYZE [table]
2538            Token::Analyze => {
2539                self.advance()?;
2540                let target = if self.check(&Token::Eof) {
2541                    None
2542                } else {
2543                    Some(self.expect_ident()?)
2544                };
2545                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2546                    target,
2547                }))
2548            }
2549            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
2550            //
2551            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
2552            // short-form `DELIMITER ',' HEADER`. The only supported format
2553            // today is CSV.
2554            Token::Copy => {
2555                self.advance()?;
2556                let table = self.expect_ident()?;
2557                self.expect(Token::From)?;
2558                let path = self.parse_string()?;
2559
2560                let mut delimiter: Option<char> = None;
2561                let mut has_header = false;
2562                let format = CopyFormat::Csv;
2563
2564                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
2565                // `WITH` is a reserved keyword token — accept both the keyword
2566                // form and the ident form that non-CTE callers sometimes emit.
2567                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2568                    self.expect(Token::LParen)?;
2569                    loop {
2570                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2571                            let _ = self.consume(&Token::Eq)?;
2572                            // Only CSV for now — accept the ident and move on.
2573                            let _ = self.expect_ident()?;
2574                        } else if self.consume(&Token::Header)? {
2575                            let _ = self.consume(&Token::Eq)?;
2576                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
2577                            // or an ident spelling of true/false.
2578                            has_header = match self.peek().clone() {
2579                                Token::True => {
2580                                    self.advance()?;
2581                                    true
2582                                }
2583                                Token::False => {
2584                                    self.advance()?;
2585                                    false
2586                                }
2587                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2588                                    self.advance()?;
2589                                    true
2590                                }
2591                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2592                                    self.advance()?;
2593                                    false
2594                                }
2595                                _ => true,
2596                            };
2597                        } else if self.consume(&Token::Delimiter)? {
2598                            let _ = self.consume(&Token::Eq)?;
2599                            let s = self.parse_string()?;
2600                            delimiter = s.chars().next();
2601                        } else {
2602                            break;
2603                        }
2604                        if !self.consume(&Token::Comma)? {
2605                            break;
2606                        }
2607                    }
2608                    self.expect(Token::RParen)?;
2609                }
2610
2611                // Short form clauses outside WITH (in either order).
2612                loop {
2613                    if self.consume(&Token::Delimiter)? {
2614                        let s = self.parse_string()?;
2615                        delimiter = s.chars().next();
2616                    } else if self.consume(&Token::Header)? {
2617                        has_header = true;
2618                    } else {
2619                        break;
2620                    }
2621                }
2622
2623                Ok(SqlCommand::CopyFrom(CopyFromQuery {
2624                    table,
2625                    path,
2626                    format,
2627                    delimiter,
2628                    has_header,
2629                }))
2630            }
2631            other => Err(ParseError::expected(
2632                vec![
2633                    "SELECT",
2634                    "FROM",
2635                    "INSERT",
2636                    "UPDATE",
2637                    "DELETE",
2638                    "EXPLAIN",
2639                    "CREATE",
2640                    "DROP",
2641                    "ALTER",
2642                    "SET",
2643                    "SHOW",
2644                    "BEGIN",
2645                    "COMMIT",
2646                    "ROLLBACK",
2647                    "SAVEPOINT",
2648                    "RELEASE",
2649                    "START",
2650                    "VACUUM",
2651                    "ANALYZE",
2652                    "COPY",
2653                    "REFRESH",
2654                    "DESCRIBE",
2655                    "DESC",
2656                ],
2657                other,
2658                self.position(),
2659            )),
2660        }
2661    }
2662}