Skip to main content

reddb_server/storage/query/
sql.rs

1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3    AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery, AskQuery, BinOp,
4    CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
5    CreateForeignTableQuery, CreateIndexQuery, CreateMigrationQuery, CreatePolicyQuery,
6    CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery, CreateTableQuery,
7    CreateTimeSeriesQuery, CreateTreeQuery, CreateVectorQuery, CreateViewQuery, DeleteQuery,
8    DropCollectionQuery, DropDocumentQuery, DropForeignTableQuery, DropGraphQuery, DropIndexQuery,
9    DropKvQuery, DropPolicyQuery, DropQueueQuery, DropSchemaQuery, DropSequenceQuery,
10    DropServerQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery,
11    DropViewQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef,
12    Filter, ForeignColumnDef, GrantStmt, GraphCommand, GraphQuery, HybridQuery, InsertQuery,
13    JoinQuery, KvCommand, MaintenanceCommand, PathQuery, PolicyAction, ProbabilisticCommand,
14    QueryExpr, QueueCommand, QueueSelectQuery, RefreshMaterializedViewQuery, RevokeStmt,
15    RollbackMigrationQuery, SearchCommand, Span, TableQuery, TreeCommand, TruncateQuery,
16    TxnControl, UpdateQuery, VectorQuery,
17};
18use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
19use crate::storage::query::sql_lowering::filter_to_expr;
20use crate::storage::query::Token;
21use crate::storage::schema::Value;
22
23/// Canonical SQL frontend command surface.
24///
25/// This is the single entrypoint for SQL/RQL-style commands before they are
26/// lowered into the broader multi-backend `QueryExpr` space.
27#[derive(Debug, Clone)]
28pub enum SqlStatement {
29    Query(SqlQuery),
30    Mutation(SqlMutation),
31    Schema(SqlSchemaCommand),
32    Admin(SqlAdminCommand),
33}
34
35#[derive(Debug, Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum FrontendStatement {
38    Sql(SqlStatement),
39    Graph(GraphQuery),
40    GraphCommand(GraphCommand),
41    Path(PathQuery),
42    Vector(VectorQuery),
43    Hybrid(HybridQuery),
44    Search(SearchCommand),
45    Ask(AskQuery),
46    QueueSelect(QueueSelectQuery),
47    QueueCommand(QueueCommand),
48    EventsBackfill(EventsBackfillQuery),
49    EventsBackfillStatus { collection: String },
50    TreeCommand(TreeCommand),
51    ProbabilisticCommand(ProbabilisticCommand),
52    KvCommand(KvCommand),
53    ConfigCommand(ConfigCommand),
54}
55
56#[derive(Debug, Clone)]
57pub enum SqlCommand {
58    Select(TableQuery),
59    Join(JoinQuery),
60    Insert(InsertQuery),
61    Update(UpdateQuery),
62    Delete(DeleteQuery),
63    ExplainAlter(ExplainAlterQuery),
64    CreateTable(CreateTableQuery),
65    CreateCollection(CreateCollectionQuery),
66    CreateVector(CreateVectorQuery),
67    DropTable(DropTableQuery),
68    DropGraph(DropGraphQuery),
69    DropVector(DropVectorQuery),
70    DropDocument(DropDocumentQuery),
71    DropKv(DropKvQuery),
72    DropCollection(DropCollectionQuery),
73    Truncate(TruncateQuery),
74    AlterTable(AlterTableQuery),
75    CreateIndex(CreateIndexQuery),
76    DropIndex(DropIndexQuery),
77    CreateTimeSeries(CreateTimeSeriesQuery),
78    DropTimeSeries(DropTimeSeriesQuery),
79    CreateQueue(CreateQueueQuery),
80    AlterQueue(AlterQueueQuery),
81    DropQueue(DropQueueQuery),
82    CreateTree(CreateTreeQuery),
83    DropTree(DropTreeQuery),
84    Probabilistic(ProbabilisticCommand),
85    SetConfig {
86        key: String,
87        value: Value,
88    },
89    ShowConfig {
90        prefix: Option<String>,
91    },
92    SetSecret {
93        key: String,
94        value: Value,
95    },
96    DeleteSecret {
97        key: String,
98    },
99    ShowSecrets {
100        prefix: Option<String>,
101    },
102    SetTenant(Option<String>),
103    ShowTenant,
104    TransactionControl(TxnControl),
105    Maintenance(MaintenanceCommand),
106    CreateSchema(CreateSchemaQuery),
107    DropSchema(DropSchemaQuery),
108    CreateSequence(CreateSequenceQuery),
109    DropSequence(DropSequenceQuery),
110    CopyFrom(CopyFromQuery),
111    CreateView(CreateViewQuery),
112    DropView(DropViewQuery),
113    RefreshMaterializedView(RefreshMaterializedViewQuery),
114    CreatePolicy(CreatePolicyQuery),
115    DropPolicy(DropPolicyQuery),
116    CreateServer(CreateServerQuery),
117    DropServer(DropServerQuery),
118    CreateForeignTable(CreateForeignTableQuery),
119    DropForeignTable(DropForeignTableQuery),
120    /// `GRANT … ON … TO …`
121    Grant(GrantStmt),
122    /// `REVOKE … ON … FROM …`
123    Revoke(RevokeStmt),
124    /// `ALTER USER name <attrs>`
125    AlterUser(AlterUserStmt),
126    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
127    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
128    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
129    /// can route the multitude of shapes through a single arm.
130    IamPolicy(QueryExpr),
131    CreateMigration(CreateMigrationQuery),
132    ApplyMigration(ApplyMigrationQuery),
133    RollbackMigration(RollbackMigrationQuery),
134    ExplainMigration(ExplainMigrationQuery),
135}
136
137fn collection_model_filter(model: &str) -> Filter {
138    Filter::Compare {
139        field: FieldRef::column("", "model"),
140        op: CompareOp::Eq,
141        value: Value::Text(model.to_string().into()),
142    }
143}
144
145fn add_table_filter(query: &mut TableQuery, filter: Filter) {
146    let combined = match query.filter.take() {
147        Some(existing) => existing.and(filter),
148        None => filter,
149    };
150    query.where_expr = Some(filter_to_expr(&combined));
151    query.filter = Some(combined);
152}
153
154fn parse_show_collections_by_model(
155    parser: &mut Parser<'_>,
156    model: &str,
157) -> Result<TableQuery, ParseError> {
158    let mut query = TableQuery::new("red.collections");
159    parser.parse_table_clauses(&mut query)?;
160    add_table_filter(&mut query, collection_model_filter(model));
161    Ok(query)
162}
163
164#[derive(Debug, Clone)]
165#[allow(clippy::large_enum_variant)]
166pub enum SqlQuery {
167    Select(TableQuery),
168    Join(JoinQuery),
169}
170
171#[derive(Debug, Clone)]
172pub enum SqlMutation {
173    Insert(InsertQuery),
174    Update(UpdateQuery),
175    Delete(DeleteQuery),
176}
177
178#[derive(Debug, Clone)]
179pub enum SqlSchemaCommand {
180    ExplainAlter(ExplainAlterQuery),
181    CreateTable(CreateTableQuery),
182    CreateCollection(CreateCollectionQuery),
183    CreateVector(CreateVectorQuery),
184    DropTable(DropTableQuery),
185    DropGraph(DropGraphQuery),
186    DropVector(DropVectorQuery),
187    DropDocument(DropDocumentQuery),
188    DropKv(DropKvQuery),
189    DropCollection(DropCollectionQuery),
190    Truncate(TruncateQuery),
191    AlterTable(AlterTableQuery),
192    CreateIndex(CreateIndexQuery),
193    DropIndex(DropIndexQuery),
194    CreateTimeSeries(CreateTimeSeriesQuery),
195    DropTimeSeries(DropTimeSeriesQuery),
196    CreateQueue(CreateQueueQuery),
197    AlterQueue(AlterQueueQuery),
198    DropQueue(DropQueueQuery),
199    CreateTree(CreateTreeQuery),
200    DropTree(DropTreeQuery),
201    Probabilistic(ProbabilisticCommand),
202    CreateSchema(CreateSchemaQuery),
203    DropSchema(DropSchemaQuery),
204    CreateSequence(CreateSequenceQuery),
205    DropSequence(DropSequenceQuery),
206    CopyFrom(CopyFromQuery),
207    CreateView(CreateViewQuery),
208    DropView(DropViewQuery),
209    RefreshMaterializedView(RefreshMaterializedViewQuery),
210    CreatePolicy(CreatePolicyQuery),
211    DropPolicy(DropPolicyQuery),
212    CreateServer(CreateServerQuery),
213    DropServer(DropServerQuery),
214    CreateForeignTable(CreateForeignTableQuery),
215    DropForeignTable(DropForeignTableQuery),
216    CreateMigration(CreateMigrationQuery),
217    ApplyMigration(ApplyMigrationQuery),
218    RollbackMigration(RollbackMigrationQuery),
219    ExplainMigration(ExplainMigrationQuery),
220}
221
222#[derive(Debug, Clone)]
223#[allow(clippy::large_enum_variant)]
224pub enum SqlAdminCommand {
225    SetConfig { key: String, value: Value },
226    ShowConfig { prefix: Option<String> },
227    SetSecret { key: String, value: Value },
228    DeleteSecret { key: String },
229    ShowSecrets { prefix: Option<String> },
230    SetTenant(Option<String>),
231    ShowTenant,
232    TransactionControl(TxnControl),
233    Maintenance(MaintenanceCommand),
234    Grant(GrantStmt),
235    Revoke(RevokeStmt),
236    AlterUser(AlterUserStmt),
237    IamPolicy(QueryExpr),
238}
239
240impl SqlStatement {
241    pub fn into_command(self) -> SqlCommand {
242        match self {
243            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
244            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
245            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
246            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
247            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
248            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
249                SqlCommand::ExplainAlter(query)
250            }
251            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
252                SqlCommand::CreateTable(query)
253            }
254            SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
255                SqlCommand::CreateCollection(query)
256            }
257            SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
258                SqlCommand::CreateVector(query)
259            }
260            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
261                SqlCommand::DropTable(query)
262            }
263            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
264                SqlCommand::DropGraph(query)
265            }
266            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
267                SqlCommand::DropVector(query)
268            }
269            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
270                SqlCommand::DropDocument(query)
271            }
272            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
273            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
274                SqlCommand::DropCollection(query)
275            }
276            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
277            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
278                SqlCommand::AlterTable(query)
279            }
280            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
281                SqlCommand::CreateIndex(query)
282            }
283            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
284                SqlCommand::DropIndex(query)
285            }
286            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
287                SqlCommand::CreateTimeSeries(query)
288            }
289            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
290                SqlCommand::DropTimeSeries(query)
291            }
292            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
293                SqlCommand::CreateQueue(query)
294            }
295            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
296                SqlCommand::AlterQueue(query)
297            }
298            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
299                SqlCommand::DropQueue(query)
300            }
301            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
302                SqlCommand::CreateTree(query)
303            }
304            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
305            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
306                SqlCommand::Probabilistic(command)
307            }
308            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
309                SqlCommand::SetConfig { key, value }
310            }
311            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
312                SqlCommand::ShowConfig { prefix }
313            }
314            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
315                SqlCommand::SetSecret { key, value }
316            }
317            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
318                SqlCommand::DeleteSecret { key }
319            }
320            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
321                SqlCommand::ShowSecrets { prefix }
322            }
323            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
324            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
325            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
326                SqlCommand::TransactionControl(ctl)
327            }
328            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
329            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
330            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
331            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
332                SqlCommand::CreateSequence(q)
333            }
334            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
335            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
336            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
337            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
338            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
339                SqlCommand::RefreshMaterializedView(q)
340            }
341            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
342            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
343            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
344            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
345            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
346                SqlCommand::CreateForeignTable(q)
347            }
348            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
349                SqlCommand::DropForeignTable(q)
350            }
351            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
352            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
353            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
354            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
355            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
356                SqlCommand::CreateMigration(q)
357            }
358            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
359                SqlCommand::ApplyMigration(q)
360            }
361            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
362                SqlCommand::RollbackMigration(q)
363            }
364            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
365                SqlCommand::ExplainMigration(q)
366            }
367        }
368    }
369
370    pub fn into_query_expr(self) -> QueryExpr {
371        self.into_command().into_query_expr()
372    }
373}
374
375impl FrontendStatement {
376    pub fn into_query_expr(self) -> QueryExpr {
377        match self {
378            FrontendStatement::Sql(statement) => statement.into_query_expr(),
379            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
380            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
381            FrontendStatement::Path(query) => QueryExpr::Path(query),
382            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
383            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
384            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
385            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
386            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
387            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
388            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
389            FrontendStatement::EventsBackfillStatus { collection } => {
390                QueryExpr::EventsBackfillStatus { collection }
391            }
392            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
393            FrontendStatement::ProbabilisticCommand(command) => {
394                QueryExpr::ProbabilisticCommand(command)
395            }
396            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
397            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
398        }
399    }
400}
401
402pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
403    let mut parser = Parser::new(input)?;
404    let statement = parser.parse_frontend_statement()?;
405    if !parser.check(&Token::Eof) {
406        return Err(ParseError::new(
407            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
408            // Display arms emit raw user bytes. Render via `{:?}` so
409            // embedded CR/LF/NUL/quotes are escaped before the message
410            // reaches downstream JSON / audit / log / gRPC sinks.
411            format!("Unexpected token after query: {:?}", parser.current.token),
412            parser.position(),
413        ));
414    }
415    Ok(statement)
416}
417
418impl SqlCommand {
419    pub fn into_query_expr(self) -> QueryExpr {
420        match self {
421            SqlCommand::Select(query) => QueryExpr::Table(query),
422            SqlCommand::Join(query) => QueryExpr::Join(query),
423            SqlCommand::Insert(query) => QueryExpr::Insert(query),
424            SqlCommand::Update(query) => QueryExpr::Update(query),
425            SqlCommand::Delete(query) => QueryExpr::Delete(query),
426            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
427            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
428            SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
429            SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
430            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
431            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
432            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
433            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
434            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
435            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
436            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
437            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
438            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
439            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
440            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
441            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
442            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
443            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
444            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
445            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
446            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
447            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
448            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
449            SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
450            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
451            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
452            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
453            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
454            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
455            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
456            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
457            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
458            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
459            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
460            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
461            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
462            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
463            SqlCommand::DropView(q) => QueryExpr::DropView(q),
464            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
465            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
466            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
467            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
468            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
469            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
470            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
471            SqlCommand::Grant(s) => QueryExpr::Grant(s),
472            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
473            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
474            SqlCommand::IamPolicy(e) => e,
475            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
476            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
477            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
478            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
479        }
480    }
481
482    pub fn into_statement(self) -> SqlStatement {
483        match self {
484            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
485            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
486            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
487            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
488            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
489            SqlCommand::ExplainAlter(query) => {
490                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
491            }
492            SqlCommand::CreateTable(query) => {
493                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
494            }
495            SqlCommand::CreateCollection(query) => {
496                SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
497            }
498            SqlCommand::CreateVector(query) => {
499                SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
500            }
501            SqlCommand::DropTable(query) => {
502                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
503            }
504            SqlCommand::DropGraph(query) => {
505                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
506            }
507            SqlCommand::DropVector(query) => {
508                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
509            }
510            SqlCommand::DropDocument(query) => {
511                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
512            }
513            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
514            SqlCommand::DropCollection(query) => {
515                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
516            }
517            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
518            SqlCommand::AlterTable(query) => {
519                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
520            }
521            SqlCommand::CreateIndex(query) => {
522                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
523            }
524            SqlCommand::DropIndex(query) => {
525                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
526            }
527            SqlCommand::CreateTimeSeries(query) => {
528                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
529            }
530            SqlCommand::DropTimeSeries(query) => {
531                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
532            }
533            SqlCommand::CreateQueue(query) => {
534                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
535            }
536            SqlCommand::AlterQueue(query) => {
537                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
538            }
539            SqlCommand::DropQueue(query) => {
540                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
541            }
542            SqlCommand::CreateTree(query) => {
543                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
544            }
545            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
546            SqlCommand::Probabilistic(command) => {
547                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
548            }
549            SqlCommand::SetConfig { key, value } => {
550                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
551            }
552            SqlCommand::ShowConfig { prefix } => {
553                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
554            }
555            SqlCommand::SetSecret { key, value } => {
556                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
557            }
558            SqlCommand::DeleteSecret { key } => {
559                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
560            }
561            SqlCommand::ShowSecrets { prefix } => {
562                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
563            }
564            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
565            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
566            SqlCommand::TransactionControl(ctl) => {
567                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
568            }
569            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
570            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
571            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
572            SqlCommand::CreateSequence(q) => {
573                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
574            }
575            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
576            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
577            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
578            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
579            SqlCommand::RefreshMaterializedView(q) => {
580                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
581            }
582            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
583            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
584            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
585            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
586            SqlCommand::CreateForeignTable(q) => {
587                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
588            }
589            SqlCommand::DropForeignTable(q) => {
590                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
591            }
592            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
593            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
594            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
595            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
596            SqlCommand::CreateMigration(q) => {
597                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
598            }
599            SqlCommand::ApplyMigration(q) => {
600                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
601            }
602            SqlCommand::RollbackMigration(q) => {
603                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
604            }
605            SqlCommand::ExplainMigration(q) => {
606                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
607            }
608        }
609    }
610}
611
612impl<'a> Parser<'a> {
613    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
614        self.expect_ident()?; // EVENTS
615        if self.consume_ident_ci("STATUS")? {
616            let mut query = TableQuery::new("red.subscriptions");
617            let collection = match self.peek().clone() {
618                Token::Ident(name) => {
619                    self.advance()?;
620                    Some(name)
621                }
622                Token::String(name) => {
623                    self.advance()?;
624                    Some(name)
625                }
626                _ => None,
627            };
628            self.parse_table_clauses(&mut query)?;
629            if let Some(collection) = collection {
630                let filter = Filter::compare(
631                    FieldRef::column("red.subscriptions", "collection"),
632                    CompareOp::Eq,
633                    Value::text(collection),
634                );
635                let expr = filter_to_expr(&filter);
636                query.where_expr = Some(match query.where_expr.take() {
637                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
638                    None => expr,
639                });
640                query.filter = Some(match query.filter.take() {
641                    Some(existing) => existing.and(filter),
642                    None => filter,
643                });
644            }
645            return Ok(QueryExpr::Table(query));
646        }
647
648        if !self.consume_ident_ci("BACKFILL")? {
649            return Err(ParseError::expected(
650                vec!["BACKFILL", "STATUS"],
651                self.peek(),
652                self.position(),
653            ));
654        }
655
656        if self.consume_ident_ci("STATUS")? {
657            let collection = self.expect_ident()?;
658            return Ok(QueryExpr::EventsBackfillStatus { collection });
659        }
660
661        let collection = self.expect_ident()?;
662        let where_filter = if self.consume(&Token::Where)? {
663            let mut parts = Vec::new();
664            while !self.check(&Token::Eof) && !self.check(&Token::To) {
665                parts.push(self.peek().to_string());
666                self.advance()?;
667            }
668            if parts.is_empty() {
669                return Err(ParseError::expected(
670                    vec!["predicate"],
671                    self.peek(),
672                    self.position(),
673                ));
674            }
675            Some(parts.join(" "))
676        } else {
677            None
678        };
679
680        self.expect(Token::To)?;
681        let target_queue = self.expect_ident()?;
682        let limit = if self.consume(&Token::Limit)? {
683            Some(self.parse_positive_integer("LIMIT")? as u64)
684        } else {
685            None
686        };
687
688        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
689            collection,
690            where_filter,
691            target_queue,
692            limit,
693        }))
694    }
695
696    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
697    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
698    /// clause is absent. Values are always single-quoted string literals —
699    /// consistent with PG's generic-options model.
700    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
701        if !self.consume(&Token::Options)? {
702            return Ok(Vec::new());
703        }
704        self.expect(Token::LParen)?;
705        let mut out: Vec<(String, String)> = Vec::new();
706        loop {
707            // Option keys frequently collide with reserved words
708            // (`path`, `format`, `delimiter`, `header`, …) — accept
709            // the keyword form and lowercase it so downstream
710            // option-name matching stays case-insensitive.
711            let was_ident = matches!(self.peek(), Token::Ident(_));
712            let raw = self.expect_ident_or_keyword()?;
713            let key = if was_ident {
714                raw
715            } else {
716                raw.to_ascii_lowercase()
717            };
718            // Value is a single-quoted string literal.
719            let value = self.parse_string()?;
720            out.push((key, value));
721            if !self.consume(&Token::Comma)? {
722                break;
723            }
724        }
725        self.expect(Token::RParen)?;
726        Ok(out)
727    }
728
729    /// Parse any top-level frontend statement through a single shared surface.
730    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
731        match self.peek() {
732            Token::Select => match self.parse_select_query()? {
733                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
734                    SqlQuery::Select(query),
735                ))),
736                QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
737                    SqlQuery::Join(query),
738                ))),
739                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
740                other => Err(ParseError::new(
741                    format!("internal: SELECT produced unexpected query kind {other:?}"),
742                    self.position(),
743                )),
744            },
745            Token::From
746            | Token::Insert
747            | Token::Update
748            | Token::Truncate
749            | Token::Create
750            | Token::Drop
751            | Token::Alter
752            | Token::Set
753            | Token::Begin
754            | Token::Commit
755            | Token::Rollback
756            | Token::Savepoint
757            | Token::Release
758            | Token::Start
759            | Token::Vacuum
760            | Token::Analyze
761            | Token::Copy
762            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
763            Token::Explain => {
764                if matches!(
765                    self.peek_next()?,
766                    Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
767                ) {
768                    match self.parse_explain_ask_query()? {
769                        QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
770                        other => Err(ParseError::new(
771                            format!(
772                                "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
773                            ),
774                            self.position(),
775                        )),
776                    }
777                } else {
778                    self.parse_sql_statement().map(FrontendStatement::Sql)
779                }
780            }
781            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
782                self.parse_sql_statement().map(FrontendStatement::Sql)
783            }
784            Token::Ident(name)
785                if name.eq_ignore_ascii_case("GRANT")
786                    || name.eq_ignore_ascii_case("REVOKE")
787                    || name.eq_ignore_ascii_case("SIMULATE")
788                    || name.eq_ignore_ascii_case("APPLY") =>
789            {
790                self.parse_sql_statement().map(FrontendStatement::Sql)
791            }
792            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
793                self.advance()?;
794                if matches!(
795                    self.peek(),
796                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
797                ) {
798                    match self.parse_config_watch_after_watch()? {
799                        QueryExpr::ConfigCommand(command) => {
800                            Ok(FrontendStatement::ConfigCommand(command))
801                        }
802                        other => Err(ParseError::new(
803                            format!(
804                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
805                            ),
806                            self.position(),
807                        )),
808                    }
809                } else if matches!(
810                    self.peek(),
811                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
812                ) {
813                    match self.parse_vault_watch_after_watch()? {
814                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
815                        other => Err(ParseError::new(
816                            format!(
817                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
818                            ),
819                            self.position(),
820                        )),
821                    }
822                } else {
823                    match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
824                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
825                        other => Err(ParseError::new(
826                            format!("internal: WATCH produced unexpected query kind {other:?}"),
827                            self.position(),
828                        )),
829                    }
830                }
831            }
832            Token::List => {
833                self.advance()?;
834                if matches!(
835                    self.peek(),
836                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
837                ) {
838                    match self.parse_config_list_after_list()? {
839                        QueryExpr::ConfigCommand(command) => {
840                            Ok(FrontendStatement::ConfigCommand(command))
841                        }
842                        other => Err(ParseError::new(
843                            format!(
844                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
845                            ),
846                            self.position(),
847                        )),
848                    }
849                } else if matches!(
850                    self.peek(),
851                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
852                ) {
853                    match self.parse_vault_list_after_list()? {
854                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
855                        other => Err(ParseError::new(
856                            format!(
857                                "internal: LIST VAULT produced unexpected query kind {other:?}"
858                            ),
859                            self.position(),
860                        )),
861                    }
862                } else {
863                    Err(ParseError::expected(
864                        vec!["CONFIG", "VAULT"],
865                        self.peek(),
866                        self.position(),
867                    ))
868                }
869            }
870            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
871                self.advance()?;
872                if matches!(
873                    self.peek(),
874                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
875                ) {
876                    match self.parse_config_list_after_list()? {
877                        QueryExpr::ConfigCommand(command) => {
878                            Ok(FrontendStatement::ConfigCommand(command))
879                        }
880                        other => Err(ParseError::new(
881                            format!(
882                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
883                            ),
884                            self.position(),
885                        )),
886                    }
887                } else if matches!(
888                    self.peek(),
889                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
890                ) {
891                    match self.parse_vault_list_after_list()? {
892                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
893                        other => Err(ParseError::new(
894                            format!(
895                                "internal: LIST VAULT produced unexpected query kind {other:?}"
896                            ),
897                            self.position(),
898                        )),
899                    }
900                } else {
901                    Err(ParseError::expected(
902                        vec!["CONFIG", "VAULT"],
903                        self.peek(),
904                        self.position(),
905                    ))
906                }
907            }
908            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
909                if matches!(
910                    self.peek_next()?,
911                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
912                ) {
913                    match self.parse_config_command()? {
914                        QueryExpr::ConfigCommand(command) => {
915                            Ok(FrontendStatement::ConfigCommand(command))
916                        }
917                        other => Err(ParseError::new(
918                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
919                            self.position(),
920                        )),
921                    }
922                } else {
923                    self.advance()?;
924                    match self.parse_kv_invalidate_tags_after_invalidate()? {
925                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
926                        other => Err(ParseError::new(
927                            format!(
928                                "internal: INVALIDATE produced unexpected query kind {other:?}"
929                            ),
930                            self.position(),
931                        )),
932                    }
933                }
934            }
935            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
936            Token::Match => match self.parse_match_query()? {
937                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
938                other => Err(ParseError::new(
939                    format!("internal: MATCH produced unexpected query kind {other:?}"),
940                    self.position(),
941                )),
942            },
943            Token::Path => match self.parse_path_query()? {
944                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
945                other => Err(ParseError::new(
946                    format!("internal: PATH produced unexpected query kind {other:?}"),
947                    self.position(),
948                )),
949            },
950            Token::Vector => match self.parse_vector_query()? {
951                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
952                other => Err(ParseError::new(
953                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
954                    self.position(),
955                )),
956            },
957            Token::Hybrid => match self.parse_hybrid_query()? {
958                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
959                other => Err(ParseError::new(
960                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
961                    self.position(),
962                )),
963            },
964            Token::Graph => match self.parse_graph_command()? {
965                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
966                other => Err(ParseError::new(
967                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
968                    self.position(),
969                )),
970            },
971            Token::Search => match self.parse_search_command()? {
972                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
973                other => Err(ParseError::new(
974                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
975                    self.position(),
976                )),
977            },
978            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
979                match self.parse_ask_query()? {
980                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
981                    other => Err(ParseError::new(
982                        format!("internal: ASK produced unexpected query kind {other:?}"),
983                        self.position(),
984                    )),
985                }
986            }
987            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
988                match self.parse_unseal_vault_command()? {
989                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
990                    other => Err(ParseError::new(
991                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
992                        self.position(),
993                    )),
994                }
995            }
996            Token::Queue => match self.parse_queue_command()? {
997                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
998                other => Err(ParseError::new(
999                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
1000                    self.position(),
1001                )),
1002            },
1003            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1004                match self.parse_events_command()? {
1005                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1006                        SqlQuery::Select(query),
1007                    ))),
1008                    QueryExpr::EventsBackfill(query) => {
1009                        Ok(FrontendStatement::EventsBackfill(query))
1010                    }
1011                    QueryExpr::EventsBackfillStatus { collection } => {
1012                        Ok(FrontendStatement::EventsBackfillStatus { collection })
1013                    }
1014                    other => Err(ParseError::new(
1015                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
1016                        self.position(),
1017                    )),
1018                }
1019            }
1020            Token::Kv => match self.parse_kv_command()? {
1021                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1022                other => Err(ParseError::new(
1023                    format!("internal: KV produced unexpected query kind {other:?}"),
1024                    self.position(),
1025                )),
1026            },
1027            Token::Delete => {
1028                if matches!(
1029                    self.peek_next()?,
1030                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1031                ) {
1032                    match self.parse_config_command()? {
1033                        QueryExpr::ConfigCommand(command) => {
1034                            Ok(FrontendStatement::ConfigCommand(command))
1035                        }
1036                        other => Err(ParseError::new(
1037                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1038                            self.position(),
1039                        )),
1040                    }
1041                } else if matches!(
1042                    self.peek_next()?,
1043                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1044                ) {
1045                    match self.parse_vault_lifecycle_command()? {
1046                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1047                        other => Err(ParseError::new(
1048                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1049                            self.position(),
1050                        )),
1051                    }
1052                } else {
1053                    self.parse_sql_statement().map(FrontendStatement::Sql)
1054                }
1055            }
1056            Token::Add => match self.parse_config_command()? {
1057                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1058                other => Err(ParseError::new(
1059                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
1060                    self.position(),
1061                )),
1062            },
1063            Token::Purge => match self.parse_vault_lifecycle_command()? {
1064                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1065                other => Err(ParseError::new(
1066                    format!("internal: VAULT produced unexpected query kind {other:?}"),
1067                    self.position(),
1068                )),
1069            },
1070            Token::Ident(name)
1071                if name.eq_ignore_ascii_case("PUT")
1072                    || name.eq_ignore_ascii_case("GET")
1073                    || name.eq_ignore_ascii_case("RESOLVE")
1074                    || name.eq_ignore_ascii_case("ROTATE")
1075                    || name.eq_ignore_ascii_case("HISTORY")
1076                    || name.eq_ignore_ascii_case("PURGE")
1077                    || name.eq_ignore_ascii_case("INCR")
1078                    || name.eq_ignore_ascii_case("DECR")
1079                    || name.eq_ignore_ascii_case("INVALIDATE") =>
1080            {
1081                if matches!(
1082                    self.peek_next()?,
1083                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1084                ) {
1085                    match self.parse_vault_lifecycle_command()? {
1086                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1087                        other => Err(ParseError::new(
1088                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1089                            self.position(),
1090                        )),
1091                    }
1092                } else {
1093                    match self.parse_config_command()? {
1094                        QueryExpr::ConfigCommand(command) => {
1095                            Ok(FrontendStatement::ConfigCommand(command))
1096                        }
1097                        other => Err(ParseError::new(
1098                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1099                            self.position(),
1100                        )),
1101                    }
1102                }
1103            }
1104            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1105                match self.parse_vault_command()? {
1106                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1107                    other => Err(ParseError::new(
1108                        format!("internal: VAULT produced unexpected query kind {other:?}"),
1109                        self.position(),
1110                    )),
1111                }
1112            }
1113            Token::Tree => match self.parse_tree_command()? {
1114                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1115                other => Err(ParseError::new(
1116                    format!("internal: TREE produced unexpected query kind {other:?}"),
1117                    self.position(),
1118                )),
1119            },
1120            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1121                match self.parse_hll_command()? {
1122                    QueryExpr::ProbabilisticCommand(command) => {
1123                        Ok(FrontendStatement::ProbabilisticCommand(command))
1124                    }
1125                    other => Err(ParseError::new(
1126                        format!("internal: HLL produced unexpected query kind {other:?}"),
1127                        self.position(),
1128                    )),
1129                }
1130            }
1131            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1132                match self.parse_sketch_command()? {
1133                    QueryExpr::ProbabilisticCommand(command) => {
1134                        Ok(FrontendStatement::ProbabilisticCommand(command))
1135                    }
1136                    other => Err(ParseError::new(
1137                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
1138                        self.position(),
1139                    )),
1140                }
1141            }
1142            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1143                match self.parse_filter_command()? {
1144                    QueryExpr::ProbabilisticCommand(command) => {
1145                        Ok(FrontendStatement::ProbabilisticCommand(command))
1146                    }
1147                    other => Err(ParseError::new(
1148                        format!("internal: FILTER produced unexpected query kind {other:?}"),
1149                        self.position(),
1150                    )),
1151                }
1152            }
1153            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1154                .parse_sql_command()
1155                .map(SqlCommand::into_statement)
1156                .map(FrontendStatement::Sql),
1157            other => Err(ParseError::expected(
1158                vec![
1159                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1160                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1161                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1162                ],
1163                other,
1164                self.position(),
1165            )),
1166        }
1167    }
1168
1169    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
1170    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1171        self.parse_sql_command().map(SqlCommand::into_statement)
1172    }
1173
1174    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1175        let mut path = self.expect_ident()?;
1176        while self.consume(&Token::Dot)? {
1177            let next = self.expect_ident_or_keyword()?;
1178            path = format!("{path}.{next}");
1179        }
1180        Ok(if lowercase {
1181            path.to_ascii_lowercase()
1182        } else {
1183            path
1184        })
1185    }
1186
1187    /// Parse any SQL/RQL-style command through a single frontend module.
1188    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1189        match self.peek() {
1190            Token::Select => match self.parse_select_query()? {
1191                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1192                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1193                other => Err(ParseError::new(
1194                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1195                    self.position(),
1196                )),
1197            },
1198            Token::From => match self.parse_from_query()? {
1199                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1200                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1201                other => Err(ParseError::new(
1202                    format!("internal: FROM produced unexpected query kind {other:?}"),
1203                    self.position(),
1204                )),
1205            },
1206            Token::Insert => match self.parse_insert_query()? {
1207                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1208                other => Err(ParseError::new(
1209                    format!("internal: INSERT produced unexpected query kind {other:?}"),
1210                    self.position(),
1211                )),
1212            },
1213            Token::Update => match self.parse_update_query()? {
1214                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1215                other => Err(ParseError::new(
1216                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
1217                    self.position(),
1218                )),
1219            },
1220            Token::Delete => {
1221                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1222                {
1223                    self.advance()?; // DELETE
1224                    self.advance()?; // SECRET
1225                    let key = self.parse_dotted_admin_path(true)?;
1226                    Ok(SqlCommand::DeleteSecret { key })
1227                } else {
1228                    match self.parse_delete_query()? {
1229                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1230                        other => Err(ParseError::new(
1231                            format!("internal: DELETE produced unexpected query kind {other:?}"),
1232                            self.position(),
1233                        )),
1234                    }
1235                }
1236            }
1237            Token::Truncate => {
1238                self.advance()?;
1239                let model = if self.consume(&Token::Table)? {
1240                    Some(CollectionModel::Table)
1241                } else if self.consume(&Token::Graph)? {
1242                    Some(CollectionModel::Graph)
1243                } else if self.consume(&Token::Vector)? {
1244                    Some(CollectionModel::Vector)
1245                } else if self.consume(&Token::Document)? {
1246                    Some(CollectionModel::Document)
1247                } else if self.consume(&Token::Timeseries)? {
1248                    Some(CollectionModel::TimeSeries)
1249                } else if self.consume(&Token::Kv)? {
1250                    Some(CollectionModel::Kv)
1251                } else if self.consume(&Token::Queue)? {
1252                    Some(CollectionModel::Queue)
1253                } else if self.consume(&Token::Collection)? {
1254                    None
1255                } else {
1256                    return Err(ParseError::expected(
1257                        vec![
1258                            "TABLE",
1259                            "GRAPH",
1260                            "VECTOR",
1261                            "DOCUMENT",
1262                            "TIMESERIES",
1263                            "KV",
1264                            "QUEUE",
1265                            "COLLECTION",
1266                        ],
1267                        self.peek(),
1268                        self.position(),
1269                    ));
1270                };
1271                match self.parse_truncate_body(model)? {
1272                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1273                    other => Err(ParseError::new(
1274                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1275                        self.position(),
1276                    )),
1277                }
1278            }
1279            Token::Explain => {
1280                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
1281                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
1282                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1283                {
1284                    self.advance()?; // consume EXPLAIN
1285                    match self.parse_explain_migration_after_keyword()? {
1286                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
1287                        other => Err(ParseError::new(
1288                            format!(
1289                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
1290                            ),
1291                            self.position(),
1292                        )),
1293                    }
1294                } else {
1295                    match self.parse_explain_alter_query()? {
1296                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
1297                        other => Err(ParseError::new(
1298                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
1299                            self.position(),
1300                        )),
1301                    }
1302                }
1303            }
1304            Token::Create => {
1305                let pos = self.position();
1306                self.advance()?;
1307
1308                // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
1309                // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
1310                // don't collide with other CREATE variants (TABLE, INDEX, etc.).
1311                let mut or_replace = false;
1312                if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1313                    let _ = self.consume_ident_ci("REPLACE")?;
1314                    or_replace = true;
1315                }
1316                let materialized = self.consume(&Token::Materialized)?;
1317                if self.check(&Token::View) {
1318                    self.advance()?;
1319                    let if_not_exists = self.match_if_not_exists()?;
1320                    let name = self.expect_ident()?;
1321                    // Accept `AS` — the lexer promotes it to `Token::As`
1322                    // (keyword) but some paths still see it as an ident.
1323                    if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1324                        return Err(ParseError::expected(
1325                            vec!["AS"],
1326                            self.peek(),
1327                            self.position(),
1328                        ));
1329                    }
1330                    // Recursive parse of the body. Any QueryExpr that the
1331                    // rest of the grammar accepts is valid (Select, Join, etc.).
1332                    let body = self.parse_sql_command()?.into_query_expr();
1333                    return Ok(SqlCommand::CreateView(CreateViewQuery {
1334                        name,
1335                        query: Box::new(body),
1336                        materialized,
1337                        if_not_exists,
1338                        or_replace,
1339                    }));
1340                }
1341                // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
1342                // bail out — no other CREATE form accepts those modifiers.
1343                if or_replace || materialized {
1344                    return Err(ParseError::expected(
1345                        vec!["VIEW"],
1346                        self.peek(),
1347                        self.position(),
1348                    ));
1349                }
1350
1351                if self.check(&Token::Index) || self.check(&Token::Unique) {
1352                    match self.parse_create_index_query()? {
1353                        QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1354                        other => Err(ParseError::new(
1355                            format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1356                            self.position(),
1357                        )),
1358                    }
1359                } else if self.check(&Token::Table) {
1360                    self.expect(Token::Table)?;
1361                    match self.parse_create_table_body()? {
1362                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1363                        other => Err(ParseError::new(
1364                            format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1365                            self.position(),
1366                        )),
1367                    }
1368                } else if self.check(&Token::Graph) {
1369                    self.advance()?;
1370                    match self.parse_create_collection_model_body(CollectionModel::Graph)? {
1371                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1372                        other => Err(ParseError::new(
1373                            format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
1374                            self.position(),
1375                        )),
1376                    }
1377                } else if self.check(&Token::Document) {
1378                    self.advance()?;
1379                    match self.parse_create_collection_model_body(CollectionModel::Document)? {
1380                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1381                        other => Err(ParseError::new(
1382                            format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
1383                            self.position(),
1384                        )),
1385                    }
1386                } else if self.check(&Token::Vector) {
1387                    self.advance()?;
1388                    match self.parse_create_vector_body()? {
1389                        QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
1390                        other => Err(ParseError::new(
1391                            format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
1392                            self.position(),
1393                        )),
1394                    }
1395                } else if self.check(&Token::Collection) {
1396                    self.advance()?;
1397                    match self.parse_create_collection_body()? {
1398                        QueryExpr::CreateCollection(query) => {
1399                            Ok(SqlCommand::CreateCollection(query))
1400                        }
1401                        other => Err(ParseError::new(
1402                            format!(
1403                                "internal: CREATE COLLECTION produced unexpected kind {other:?}"
1404                            ),
1405                            self.position(),
1406                        )),
1407                    }
1408                } else if self.check(&Token::Kv) {
1409                    self.advance()?;
1410                    match self.parse_create_keyed_body(CollectionModel::Kv)? {
1411                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1412                        other => Err(ParseError::new(
1413                            format!("internal: CREATE KV produced unexpected kind {other:?}"),
1414                            self.position(),
1415                        )),
1416                    }
1417                } else if self.consume_ident_ci("CONFIG")? {
1418                    match self.parse_create_keyed_body(CollectionModel::Config)? {
1419                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1420                        other => Err(ParseError::new(
1421                            format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1422                            self.position(),
1423                        )),
1424                    }
1425                } else if self.consume_ident_ci("VAULT")? {
1426                    match self.parse_create_keyed_body(CollectionModel::Vault)? {
1427                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1428                        other => Err(ParseError::new(
1429                            format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1430                            self.position(),
1431                        )),
1432                    }
1433                } else if self.check(&Token::Timeseries) {
1434                    self.advance()?;
1435                    match self.parse_create_timeseries_body()? {
1436                        QueryExpr::CreateTimeSeries(query) => {
1437                            Ok(SqlCommand::CreateTimeSeries(query))
1438                        }
1439                        other => Err(ParseError::new(
1440                            format!(
1441                                "internal: CREATE TIMESERIES produced unexpected kind {other:?}"
1442                            ),
1443                            self.position(),
1444                        )),
1445                    }
1446                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1447                {
1448                    self.advance()?;
1449                    match self.parse_create_hypertable_body()? {
1450                        QueryExpr::CreateTimeSeries(query) => {
1451                            Ok(SqlCommand::CreateTimeSeries(query))
1452                        }
1453                        other => Err(ParseError::new(
1454                            format!(
1455                                "internal: CREATE HYPERTABLE produced unexpected kind {other:?}"
1456                            ),
1457                            self.position(),
1458                        )),
1459                    }
1460                } else if self.check(&Token::Queue) {
1461                    self.advance()?;
1462                    match self.parse_create_queue_body()? {
1463                        QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1464                        other => Err(ParseError::new(
1465                            format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1466                            self.position(),
1467                        )),
1468                    }
1469                } else if self.check(&Token::Tree) {
1470                    self.advance()?;
1471                    match self.parse_create_tree_body()? {
1472                        QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1473                        other => Err(ParseError::new(
1474                            format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1475                            self.position(),
1476                        )),
1477                    }
1478                } else if matches!(self.peek(), Token::Ident(n) if
1479                    n.eq_ignore_ascii_case("HLL") ||
1480                    n.eq_ignore_ascii_case("SKETCH") ||
1481                    n.eq_ignore_ascii_case("FILTER"))
1482                {
1483                    match self.parse_create_probabilistic()? {
1484                        QueryExpr::ProbabilisticCommand(command) => {
1485                            Ok(SqlCommand::Probabilistic(command))
1486                        }
1487                        other => Err(ParseError::new(
1488                            format!(
1489                                "internal: CREATE probabilistic produced unexpected kind {other:?}"
1490                            ),
1491                            self.position(),
1492                        )),
1493                    }
1494                } else if self.check(&Token::Schema) {
1495                    // CREATE SCHEMA [IF NOT EXISTS] name
1496                    self.advance()?;
1497                    let if_not_exists = self.match_if_not_exists()?;
1498                    let name = self.expect_ident()?;
1499                    Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1500                        name,
1501                        if_not_exists,
1502                    }))
1503                } else if self.check(&Token::Policy) {
1504                    // Two forms share the leading `CREATE POLICY` tokens:
1505                    //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
1506                    //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
1507                    // Disambiguate by peeking the token after POLICY.
1508                    self.advance()?;
1509                    if matches!(self.peek(), Token::String(_)) {
1510                        // IAM form — short-circuit out of the SQL command stack.
1511                        let expr = self.parse_create_iam_policy_after_keywords()?;
1512                        // Inline command-wrapping: produce a synthetic SqlCommand by
1513                        // routing through a generic IAM admin holder. We don't
1514                        // have a dedicated SqlCommand variant for IAM yet, so we
1515                        // bounce through the existing Grant-shaped Admin slot
1516                        // which expects no further tokens.
1517                        return Ok(SqlCommand::IamPolicy(expr));
1518                    }
1519                    let name = self.expect_ident()?;
1520                    self.expect(Token::On)?;
1521
1522                    let (target_kind, table) = {
1523                        use crate::storage::query::ast::PolicyTargetKind;
1524                        let kw = match self.peek() {
1525                            Token::Ident(s) => Some(s.to_ascii_uppercase()),
1526                            _ => None,
1527                        };
1528                        let kind = kw.as_deref().and_then(|k| match k {
1529                            "NODES" => Some(PolicyTargetKind::Nodes),
1530                            "EDGES" => Some(PolicyTargetKind::Edges),
1531                            "VECTORS" => Some(PolicyTargetKind::Vectors),
1532                            "MESSAGES" => Some(PolicyTargetKind::Messages),
1533                            "POINTS" => Some(PolicyTargetKind::Points),
1534                            "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1535                            _ => None,
1536                        });
1537                        if let Some(k) = kind {
1538                            self.advance()?;
1539                            self.expect(Token::Of)?;
1540                            let coll = self.expect_ident()?;
1541                            (k, coll)
1542                        } else {
1543                            let coll = self.expect_ident()?;
1544                            (PolicyTargetKind::Table, coll)
1545                        }
1546                    };
1547
1548                    let action = if self.consume(&Token::For)? {
1549                        let a = match self.peek() {
1550                            Token::Select => {
1551                                self.advance()?;
1552                                Some(PolicyAction::Select)
1553                            }
1554                            Token::Insert => {
1555                                self.advance()?;
1556                                Some(PolicyAction::Insert)
1557                            }
1558                            Token::Update => {
1559                                self.advance()?;
1560                                Some(PolicyAction::Update)
1561                            }
1562                            Token::Delete => {
1563                                self.advance()?;
1564                                Some(PolicyAction::Delete)
1565                            }
1566                            Token::All => {
1567                                self.advance()?;
1568                                None
1569                            }
1570                            _ => None,
1571                        };
1572                        a
1573                    } else {
1574                        None
1575                    };
1576
1577                    let role = if self.consume(&Token::To)? {
1578                        Some(self.expect_ident()?)
1579                    } else {
1580                        None
1581                    };
1582
1583                    self.expect(Token::Using)?;
1584                    self.expect(Token::LParen)?;
1585                    let filter = self.parse_filter()?;
1586                    self.expect(Token::RParen)?;
1587
1588                    Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1589                        name,
1590                        table,
1591                        action,
1592                        role,
1593                        using: Box::new(filter),
1594                        target_kind,
1595                    }))
1596                } else if self.check(&Token::Server) {
1597                    // CREATE SERVER [IF NOT EXISTS] name
1598                    //   FOREIGN DATA WRAPPER kind
1599                    //   [OPTIONS (key 'value', ...)]
1600                    self.advance()?;
1601                    let if_not_exists = self.match_if_not_exists()?;
1602                    let name = self.expect_ident()?;
1603                    self.expect(Token::Foreign)?;
1604                    self.expect(Token::Data)?;
1605                    self.expect(Token::Wrapper)?;
1606                    let wrapper = self.expect_ident()?;
1607                    let options = self.parse_fdw_options_clause()?;
1608                    Ok(SqlCommand::CreateServer(CreateServerQuery {
1609                        name,
1610                        wrapper,
1611                        options,
1612                        if_not_exists,
1613                    }))
1614                } else if self.check(&Token::Foreign) {
1615                    // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
1616                    //   SERVER server_name
1617                    //   [OPTIONS (key 'value', ...)]
1618                    self.advance()?;
1619                    self.expect(Token::Table)?;
1620                    let if_not_exists = self.match_if_not_exists()?;
1621                    let name = self.expect_ident()?;
1622                    self.expect(Token::LParen)?;
1623                    let mut columns = Vec::new();
1624                    loop {
1625                        let col_name = self.expect_ident()?;
1626                        let data_type = self.expect_ident_or_keyword()?;
1627                        // Inline NOT NULL check — the CREATE TABLE path's helper is
1628                        // private and coupling to it just for FDW columns isn't worth it.
1629                        let mut not_null = false;
1630                        if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1631                            self.advance()?;
1632                            if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL"))
1633                            {
1634                                self.advance()?;
1635                                not_null = true;
1636                            }
1637                        }
1638                        columns.push(ForeignColumnDef {
1639                            name: col_name,
1640                            data_type,
1641                            not_null,
1642                        });
1643                        if !self.consume(&Token::Comma)? {
1644                            break;
1645                        }
1646                    }
1647                    self.expect(Token::RParen)?;
1648                    self.expect(Token::Server)?;
1649                    let server = self.expect_ident()?;
1650                    let options = self.parse_fdw_options_clause()?;
1651                    Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1652                        name,
1653                        server,
1654                        columns,
1655                        options,
1656                        if_not_exists,
1657                    }))
1658                } else if self.check(&Token::Sequence) {
1659                    // CREATE SEQUENCE [IF NOT EXISTS] name
1660                    //   [START [WITH] n] [INCREMENT [BY] n]
1661                    self.advance()?;
1662                    let if_not_exists = self.match_if_not_exists()?;
1663                    let name = self.expect_ident()?;
1664                    let mut start: i64 = 1;
1665                    let mut increment: i64 = 1;
1666                    // Loop over optional clauses in any order.
1667                    loop {
1668                        if self.consume(&Token::Start)? {
1669                            // Accept `START 100` or `START WITH 100`.
1670                            let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1671                            start = self.parse_integer()?;
1672                        } else if self.consume(&Token::Increment)? {
1673                            // Accept `INCREMENT 5` or `INCREMENT BY 5`.
1674                            let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1675                            increment = self.parse_integer()?;
1676                        } else {
1677                            break;
1678                        }
1679                    }
1680                    Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1681                        name,
1682                        if_not_exists,
1683                        start,
1684                        increment,
1685                    }))
1686                } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1687                {
1688                    self.advance()?; // consume MIGRATION
1689                    match self.parse_create_migration_body()? {
1690                        QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1691                        other => Err(ParseError::new(
1692                            format!(
1693                                "internal: CREATE MIGRATION produced unexpected kind {other:?}"
1694                            ),
1695                            self.position(),
1696                        )),
1697                    }
1698                } else if let Some(err) =
1699                    ParseError::unsupported_recognized_token(self.peek(), self.position())
1700                {
1701                    Err(err)
1702                } else {
1703                    Err(ParseError::expected(
1704                        vec![
1705                            "TABLE",
1706                            "GRAPH",
1707                            "VECTOR",
1708                            "DOCUMENT",
1709                            "KV",
1710                            "COLLECTION",
1711                            "INDEX",
1712                            "UNIQUE",
1713                            "TIMESERIES",
1714                            "QUEUE",
1715                            "TREE",
1716                            "HLL",
1717                            "SKETCH",
1718                            "FILTER",
1719                            "SCHEMA",
1720                            "SEQUENCE",
1721                            "MIGRATION",
1722                        ],
1723                        self.peek(),
1724                        pos,
1725                    ))
1726                }
1727            }
1728            Token::Drop => {
1729                let pos = self.position();
1730                self.advance()?;
1731
1732                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
1733                let materialized = self.consume(&Token::Materialized)?;
1734                if self.check(&Token::View) {
1735                    self.advance()?;
1736                    let if_exists = self.match_if_exists()?;
1737                    let name = self.expect_ident()?;
1738                    return Ok(SqlCommand::DropView(DropViewQuery {
1739                        name,
1740                        materialized,
1741                        if_exists,
1742                    }));
1743                }
1744                if materialized {
1745                    return Err(ParseError::expected(
1746                        vec!["VIEW"],
1747                        self.peek(),
1748                        self.position(),
1749                    ));
1750                }
1751
1752                if self.check(&Token::Index) {
1753                    match self.parse_drop_index_query()? {
1754                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
1755                        other => Err(ParseError::new(
1756                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
1757                            self.position(),
1758                        )),
1759                    }
1760                } else if self.check(&Token::Table) {
1761                    self.expect(Token::Table)?;
1762                    match self.parse_drop_table_body()? {
1763                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
1764                        other => Err(ParseError::new(
1765                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
1766                            self.position(),
1767                        )),
1768                    }
1769                } else if self.check(&Token::Graph) {
1770                    self.advance()?;
1771                    match self.parse_drop_graph_body()? {
1772                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
1773                        other => Err(ParseError::new(
1774                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
1775                            self.position(),
1776                        )),
1777                    }
1778                } else if self.check(&Token::Vector) {
1779                    self.advance()?;
1780                    match self.parse_drop_vector_body()? {
1781                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
1782                        other => Err(ParseError::new(
1783                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
1784                            self.position(),
1785                        )),
1786                    }
1787                } else if self.check(&Token::Document) {
1788                    self.advance()?;
1789                    match self.parse_drop_document_body()? {
1790                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
1791                        other => Err(ParseError::new(
1792                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
1793                            self.position(),
1794                        )),
1795                    }
1796                } else if self.check(&Token::Kv) {
1797                    self.advance()?;
1798                    match self.parse_drop_kv_body()? {
1799                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1800                        other => Err(ParseError::new(
1801                            format!("internal: DROP KV produced unexpected kind {other:?}"),
1802                            self.position(),
1803                        )),
1804                    }
1805                } else if self.consume_ident_ci("CONFIG")? {
1806                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
1807                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1808                        other => Err(ParseError::new(
1809                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
1810                            self.position(),
1811                        )),
1812                    }
1813                } else if self.consume_ident_ci("VAULT")? {
1814                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
1815                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1816                        other => Err(ParseError::new(
1817                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
1818                            self.position(),
1819                        )),
1820                    }
1821                } else if self.check(&Token::Collection) {
1822                    self.advance()?;
1823                    match self.parse_drop_collection_body()? {
1824                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1825                        other => Err(ParseError::new(
1826                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
1827                            self.position(),
1828                        )),
1829                    }
1830                } else if self.check(&Token::Timeseries) {
1831                    self.advance()?;
1832                    match self.parse_drop_timeseries_body()? {
1833                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1834                        other => Err(ParseError::new(
1835                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
1836                            self.position(),
1837                        )),
1838                    }
1839                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1840                {
1841                    // DROP HYPERTABLE name reuses the same AST as
1842                    // DROP TIMESERIES — runtime clears the registry
1843                    // entry *and* drops the backing collection.
1844                    self.advance()?;
1845                    match self.parse_drop_timeseries_body()? {
1846                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1847                        other => Err(ParseError::new(
1848                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
1849                            self.position(),
1850                        )),
1851                    }
1852                } else if self.check(&Token::Queue) {
1853                    self.advance()?;
1854                    match self.parse_drop_queue_body()? {
1855                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
1856                        other => Err(ParseError::new(
1857                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
1858                            self.position(),
1859                        )),
1860                    }
1861                } else if self.check(&Token::Tree) {
1862                    self.advance()?;
1863                    match self.parse_drop_tree_body()? {
1864                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
1865                        other => Err(ParseError::new(
1866                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
1867                            self.position(),
1868                        )),
1869                    }
1870                } else if matches!(self.peek(), Token::Ident(n) if
1871                    n.eq_ignore_ascii_case("HLL") ||
1872                    n.eq_ignore_ascii_case("SKETCH") ||
1873                    n.eq_ignore_ascii_case("FILTER"))
1874                {
1875                    match self.parse_drop_probabilistic()? {
1876                        QueryExpr::ProbabilisticCommand(command) => {
1877                            Ok(SqlCommand::Probabilistic(command))
1878                        }
1879                        other => Err(ParseError::new(
1880                            format!(
1881                                "internal: DROP probabilistic produced unexpected kind {other:?}"
1882                            ),
1883                            self.position(),
1884                        )),
1885                    }
1886                } else if self.check(&Token::Schema) {
1887                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
1888                    self.advance()?;
1889                    let if_exists = self.match_if_exists()?;
1890                    let name = self.expect_ident()?;
1891                    let cascade = self.consume(&Token::Cascade)?;
1892                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
1893                        name,
1894                        if_exists,
1895                        cascade,
1896                    }))
1897                } else if self.check(&Token::Policy) {
1898                    // Two forms:
1899                    //   * IAM:   DROP POLICY '<id>'
1900                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
1901                    self.advance()?;
1902                    if matches!(self.peek(), Token::String(_)) {
1903                        let expr = self.parse_drop_iam_policy_after_keywords()?;
1904                        return Ok(SqlCommand::IamPolicy(expr));
1905                    }
1906                    let if_exists = self.match_if_exists()?;
1907                    let name = self.expect_ident()?;
1908                    self.expect(Token::On)?;
1909                    let table = self.expect_ident()?;
1910                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
1911                        name,
1912                        table,
1913                        if_exists,
1914                    }))
1915                } else if self.check(&Token::Server) {
1916                    // DROP SERVER [IF EXISTS] name [CASCADE]
1917                    self.advance()?;
1918                    let if_exists = self.match_if_exists()?;
1919                    let name = self.expect_ident()?;
1920                    let cascade = self.consume(&Token::Cascade)?;
1921                    Ok(SqlCommand::DropServer(DropServerQuery {
1922                        name,
1923                        if_exists,
1924                        cascade,
1925                    }))
1926                } else if self.check(&Token::Foreign) {
1927                    // DROP FOREIGN TABLE [IF EXISTS] name
1928                    self.advance()?;
1929                    self.expect(Token::Table)?;
1930                    let if_exists = self.match_if_exists()?;
1931                    let name = self.expect_ident()?;
1932                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
1933                        name,
1934                        if_exists,
1935                    }))
1936                } else if self.check(&Token::Sequence) {
1937                    // DROP SEQUENCE [IF EXISTS] name
1938                    self.advance()?;
1939                    let if_exists = self.match_if_exists()?;
1940                    let name = self.expect_ident()?;
1941                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
1942                        name,
1943                        if_exists,
1944                    }))
1945                } else if let Some(err) =
1946                    ParseError::unsupported_recognized_token(self.peek(), self.position())
1947                {
1948                    Err(err)
1949                } else {
1950                    Err(ParseError::expected(
1951                        vec![
1952                            "TABLE",
1953                            "INDEX",
1954                            "TIMESERIES",
1955                            "QUEUE",
1956                            "TREE",
1957                            "HLL",
1958                            "SKETCH",
1959                            "FILTER",
1960                            "SCHEMA",
1961                            "SEQUENCE",
1962                        ],
1963                        self.peek(),
1964                        pos,
1965                    ))
1966                }
1967            }
1968            Token::Alter => {
1969                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
1970                // committing to a path until we've seen the target.
1971                // We peek the *next* token (without consuming) and
1972                // dispatch accordingly.
1973                let next = self.peek_next()?.clone();
1974                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
1975                    self.advance()?; // consume ALTER
1976                    let stmt = self.parse_alter_user_statement()?;
1977                    Ok(SqlCommand::AlterUser(stmt))
1978                } else if matches!(next, Token::Queue) {
1979                    self.advance()?; // consume ALTER
1980                    self.advance()?; // consume QUEUE
1981                    match self.parse_alter_queue_body()? {
1982                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
1983                        other => Err(ParseError::new(
1984                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
1985                            self.position(),
1986                        )),
1987                    }
1988                } else if matches!(next, Token::Table) {
1989                    match self.parse_alter_table_query()? {
1990                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
1991                        other => Err(ParseError::new(
1992                            format!(
1993                                "internal: ALTER TABLE produced unexpected query kind {other:?}"
1994                            ),
1995                            self.position(),
1996                        )),
1997                    }
1998                } else if let Some(err) =
1999                    ParseError::unsupported_recognized_token(&next, self.position())
2000                {
2001                    Err(err)
2002                } else {
2003                    match self.parse_alter_table_query()? {
2004                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2005                        other => Err(ParseError::new(
2006                            format!("internal: ALTER produced unexpected query kind {other:?}"),
2007                            self.position(),
2008                        )),
2009                    }
2010                }
2011            }
2012            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
2013                let stmt = self.parse_grant_statement()?;
2014                Ok(SqlCommand::Grant(stmt))
2015            }
2016            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
2017                let stmt = self.parse_revoke_statement()?;
2018                Ok(SqlCommand::Revoke(stmt))
2019            }
2020            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2021                self.advance()?;
2022                if self.consume_ident_ci("BACKFILL")? {
2023                    return Err(ParseError::new(
2024                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
2025                            .to_string(),
2026                        self.position(),
2027                    ));
2028                }
2029                if !self.consume_ident_ci("STATUS")? {
2030                    return Err(ParseError::expected(
2031                        vec!["STATUS"],
2032                        self.peek(),
2033                        self.position(),
2034                    ));
2035                }
2036
2037                let mut query = TableQuery::new("red.subscriptions");
2038                let collection = match self.peek().clone() {
2039                    Token::Ident(name) => {
2040                        self.advance()?;
2041                        Some(name)
2042                    }
2043                    Token::String(name) => {
2044                        self.advance()?;
2045                        Some(name)
2046                    }
2047                    _ => None,
2048                };
2049                self.parse_table_clauses(&mut query)?;
2050                if let Some(collection) = collection {
2051                    let filter = Filter::compare(
2052                        FieldRef::column("red.subscriptions", "collection"),
2053                        CompareOp::Eq,
2054                        Value::text(collection),
2055                    );
2056                    let expr = filter_to_expr(&filter);
2057                    query.where_expr = Some(match query.where_expr.take() {
2058                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
2059                        None => expr,
2060                    });
2061                    query.filter = Some(match query.filter.take() {
2062                        Some(existing) => existing.and(filter),
2063                        None => filter,
2064                    });
2065                }
2066                Ok(SqlCommand::Select(query))
2067            }
2068            Token::Attach => {
2069                let expr = self.parse_attach_policy()?;
2070                Ok(SqlCommand::IamPolicy(expr))
2071            }
2072            Token::Detach => {
2073                let expr = self.parse_detach_policy()?;
2074                Ok(SqlCommand::IamPolicy(expr))
2075            }
2076            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
2077                let expr = self.parse_simulate_policy()?;
2078                Ok(SqlCommand::IamPolicy(expr))
2079            }
2080            Token::Set => {
2081                self.advance()?;
2082                if self.consume_ident_ci("CONFIG")? {
2083                    let full_key = self.parse_dotted_admin_path(true)?;
2084                    self.expect(Token::Eq)?;
2085                    let value = self.parse_literal_value()?;
2086                    Ok(SqlCommand::SetConfig {
2087                        key: full_key,
2088                        value,
2089                    })
2090                } else if self.consume_ident_ci("SECRET")? {
2091                    let key = self.parse_dotted_admin_path(true)?;
2092                    self.expect(Token::Eq)?;
2093                    let value = self.parse_literal_value()?;
2094                    Ok(SqlCommand::SetSecret { key, value })
2095                } else if self.consume_ident_ci("TENANT")? {
2096                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
2097                    // SET TENANT NULL  |  SET TENANT = NULL
2098                    let _ = self.consume(&Token::Eq)?;
2099                    if self.consume_ident_ci("NULL")? {
2100                        Ok(SqlCommand::SetTenant(None))
2101                    } else {
2102                        let value = self.parse_literal_value()?;
2103                        match value {
2104                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2105                            Value::Null => Ok(SqlCommand::SetTenant(None)),
2106                            other => Err(ParseError::new(
2107                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2108                                self.position(),
2109                            )),
2110                        }
2111                    }
2112                } else {
2113                    Err(ParseError::expected(
2114                        vec!["CONFIG", "SECRET", "TENANT"],
2115                        self.peek(),
2116                        self.position(),
2117                    ))
2118                }
2119            }
2120            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2121                self.advance()?;
2122                match self.parse_apply_migration()? {
2123                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2124                    other => Err(ParseError::new(
2125                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2126                        self.position(),
2127                    )),
2128                }
2129            }
2130            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2131                // RESET TENANT — session-local clear
2132                self.advance()?;
2133                if self.consume_ident_ci("TENANT")? {
2134                    Ok(SqlCommand::SetTenant(None))
2135                } else {
2136                    Err(ParseError::expected(
2137                        vec!["TENANT"],
2138                        self.peek(),
2139                        self.position(),
2140                    ))
2141                }
2142            }
2143            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2144                self.advance()?;
2145                if self.consume_ident_ci("CONFIG")? {
2146                    // Accept dotted prefixes the same way SET CONFIG does
2147                    // (`SHOW CONFIG durability.mode`), and empty prefix
2148                    // (`SHOW CONFIG`) for a catalog-wide listing.
2149                    let prefix = if !self.check(&Token::Eof) {
2150                        let first = self.expect_ident()?;
2151                        let mut full = first;
2152                        while self.consume(&Token::Dot)? {
2153                            let next = self.expect_ident_or_keyword()?;
2154                            full = format!("{full}.{next}");
2155                        }
2156                        // Match SET CONFIG: lowercase so keyword segments
2157                        // come out consistent with the stored keys.
2158                        Some(full.to_ascii_lowercase())
2159                    } else {
2160                        None
2161                    };
2162                    Ok(SqlCommand::ShowConfig { prefix })
2163                } else if self.consume_ident_ci("COLLECTIONS")? {
2164                    let mut query = TableQuery::new("red.collections");
2165                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2166                        if !self.consume_ident_ci("INTERNAL")? {
2167                            return Err(ParseError::expected(
2168                                vec!["INTERNAL"],
2169                                self.peek(),
2170                                self.position(),
2171                            ));
2172                        }
2173                        true
2174                    } else {
2175                        false
2176                    };
2177                    self.parse_table_clauses(&mut query)?;
2178                    if !include_internal {
2179                        let user_filter = query.filter.take();
2180                        let hide_internal = crate::storage::query::ast::Filter::Compare {
2181                            field: FieldRef::column("", "internal"),
2182                            op: CompareOp::Eq,
2183                            value: Value::Boolean(false),
2184                        };
2185                        query.filter = Some(match user_filter {
2186                            Some(filter) => filter.and(hide_internal),
2187                            None => hide_internal,
2188                        });
2189                    }
2190                    Ok(SqlCommand::Select(query))
2191                } else if self.consume_ident_ci("TABLES")? {
2192                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2193                        self, "table",
2194                    )?))
2195                } else if self.consume_ident_ci("QUEUES")? {
2196                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2197                        self, "queue",
2198                    )?))
2199                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2200                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2201                        self, "vector",
2202                    )?))
2203                } else if self.consume_ident_ci("DOCUMENTS")? {
2204                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2205                        self, "document",
2206                    )?))
2207                } else if self.consume(&Token::Timeseries)?
2208                    || self.consume_ident_ci("TIMESERIES")?
2209                {
2210                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2211                        self,
2212                        "timeseries",
2213                    )?))
2214                } else if self.consume_ident_ci("GRAPHS")? {
2215                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2216                        self, "graph",
2217                    )?))
2218                } else if self.consume_ident_ci("CONFIGS")? {
2219                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2220                        self, "config",
2221                    )?))
2222                } else if self.consume_ident_ci("VAULTS")? {
2223                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2224                        self, "vault",
2225                    )?))
2226                } else if self.consume(&Token::Kv)?
2227                    || self.consume_ident_ci("KV")?
2228                    || self.consume_ident_ci("KVS")?
2229                {
2230                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2231                        self, "kv",
2232                    )?))
2233                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2234                    let collection = self.parse_dotted_admin_path(false)?;
2235                    let mut query = TableQuery::new("red.columns");
2236                    query.filter = Some(Filter::compare(
2237                        FieldRef::column("", "collection"),
2238                        CompareOp::Eq,
2239                        Value::text(collection),
2240                    ));
2241                    Ok(SqlCommand::Select(query))
2242                } else if self.consume_ident_ci("INDICES")? {
2243                    let mut query = TableQuery::new("red.indices");
2244                    if self.consume(&Token::On)? {
2245                        let collection = self.expect_ident_or_keyword()?;
2246                        let filter = Filter::Compare {
2247                            field: FieldRef::column("red.indices", "collection"),
2248                            op: CompareOp::Eq,
2249                            value: Value::text(collection),
2250                        };
2251                        query.where_expr = Some(filter_to_expr(&filter));
2252                        query.filter = Some(filter);
2253                    }
2254                    self.parse_table_clauses(&mut query)?;
2255                    Ok(SqlCommand::Select(query))
2256                } else if self.consume_ident_ci("POLICIES")? {
2257                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2258                        let principal = self.parse_iam_principal_kind()?;
2259                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2260                            filter: Some(principal),
2261                        }));
2262                    }
2263                    let mut query = TableQuery::new("red.policies");
2264                    let collection_filter =
2265                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2266                            let collection = self.parse_dotted_admin_path(false)?;
2267                            Some(Filter::Compare {
2268                                field: FieldRef::TableColumn {
2269                                    table: String::new(),
2270                                    column: "collection".to_string(),
2271                                },
2272                                op: CompareOp::Eq,
2273                                value: Value::text(collection),
2274                            })
2275                        } else {
2276                            None
2277                        };
2278                    self.parse_table_clauses(&mut query)?;
2279                    if let Some(collection_filter) = collection_filter {
2280                        let combined = match query.filter.take() {
2281                            Some(existing) => {
2282                                Filter::And(Box::new(collection_filter), Box::new(existing))
2283                            }
2284                            None => collection_filter,
2285                        };
2286                        query.where_expr = Some(filter_to_expr(&combined));
2287                        query.filter = Some(combined);
2288                    }
2289                    Ok(SqlCommand::Select(query))
2290                } else if self.consume_ident_ci("STATS")? {
2291                    let mut query = TableQuery::new("red.stats");
2292                    let collection = match self.peek().clone() {
2293                        Token::Ident(name) => {
2294                            self.advance()?;
2295                            Some(name)
2296                        }
2297                        Token::String(name) => {
2298                            self.advance()?;
2299                            Some(name)
2300                        }
2301                        _ => None,
2302                    };
2303                    self.parse_table_clauses(&mut query)?;
2304                    if let Some(collection) = collection {
2305                        let filter = Filter::compare(
2306                            FieldRef::column("red.stats", "collection"),
2307                            CompareOp::Eq,
2308                            Value::text(collection),
2309                        );
2310                        let expr = filter_to_expr(&filter);
2311                        query.where_expr = Some(match query.where_expr.take() {
2312                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
2313                            None => expr,
2314                        });
2315                        query.filter = Some(match query.filter.take() {
2316                            Some(existing) => existing.and(filter),
2317                            None => filter,
2318                        });
2319                    }
2320                    Ok(SqlCommand::Select(query))
2321                } else if self.consume_ident_ci("SAMPLE")? {
2322                    let mut query = TableQuery::new(&self.expect_ident()?);
2323                    query.limit = if self.consume(&Token::Limit)? {
2324                        Some(self.parse_integer()? as u64)
2325                    } else {
2326                        Some(10)
2327                    };
2328                    Ok(SqlCommand::Select(query))
2329                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2330                    let prefix = if !self.check(&Token::Eof) {
2331                        Some(self.parse_dotted_admin_path(true)?)
2332                    } else {
2333                        None
2334                    };
2335                    Ok(SqlCommand::ShowSecrets { prefix })
2336                } else if self.consume_ident_ci("TENANT")? {
2337                    Ok(SqlCommand::ShowTenant)
2338                } else if let Some(expr) = self.parse_show_iam_after_show()? {
2339                    Ok(SqlCommand::IamPolicy(expr))
2340                } else {
2341                    Err(ParseError::expected(
2342                        vec![
2343                            "CONFIG",
2344                            "SECRET",
2345                            "SECRETS",
2346                            "COLLECTIONS",
2347                            "TABLES",
2348                            "QUEUES",
2349                            "VECTORS",
2350                            "DOCUMENTS",
2351                            "TIMESERIES",
2352                            "GRAPHS",
2353                            "KV",
2354                            "SCHEMA",
2355                            "INDICES",
2356                            "SAMPLE",
2357                            "POLICIES",
2358                            "STATS",
2359                            "TENANT",
2360                            "EFFECTIVE",
2361                        ],
2362                        self.peek(),
2363                        self.position(),
2364                    ))
2365                }
2366            }
2367            // Transaction control statements (Phase 1.1 PG parity).
2368            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
2369            // START TRANSACTION [ISOLATION LEVEL <mode>]
2370            //
2371            // We only implement SNAPSHOT ISOLATION (our default). We
2372            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2373            // READ / SNAPSHOT as PG-compatible no-ops, but reject
2374            // SERIALIZABLE outright — the previous behaviour of
2375            // silently degrading to snapshot made the parser
2376            // dishonest. Real SSI (Serializable Snapshot Isolation)
2377            // is tracked as a future milestone.
2378            Token::Begin | Token::Start => {
2379                self.advance()?;
2380                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2381                // Optional ISOLATION LEVEL clause.
2382                if self.consume_ident_ci("ISOLATION")? {
2383                    self.expect(Token::Level)?;
2384                    // The level identifier can span multiple words
2385                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2386                    // READ). Collect them case-insensitively.
2387                    let mut parts: Vec<String> = Vec::new();
2388                    if self.consume_ident_ci("READ")? {
2389                        parts.push("READ".to_string());
2390                        if self.consume_ident_ci("UNCOMMITTED")? {
2391                            parts.push("UNCOMMITTED".to_string());
2392                        } else if self.consume_ident_ci("COMMITTED")? {
2393                            parts.push("COMMITTED".to_string());
2394                        } else {
2395                            return Err(ParseError::expected(
2396                                vec!["UNCOMMITTED", "COMMITTED"],
2397                                self.peek(),
2398                                self.position(),
2399                            ));
2400                        }
2401                    } else if self.consume_ident_ci("REPEATABLE")? {
2402                        parts.push("REPEATABLE".to_string());
2403                        if !self.consume_ident_ci("READ")? {
2404                            return Err(ParseError::expected(
2405                                vec!["READ"],
2406                                self.peek(),
2407                                self.position(),
2408                            ));
2409                        }
2410                        parts.push("READ".to_string());
2411                    } else if self.consume_ident_ci("SNAPSHOT")? {
2412                        parts.push("SNAPSHOT".to_string());
2413                    } else if self.consume_ident_ci("SERIALIZABLE")? {
2414                        return Err(ParseError::new(
2415                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2416                             currently provides SNAPSHOT ISOLATION (which PG calls \
2417                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2418                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
2419                                .to_string(),
2420                            self.position(),
2421                        ));
2422                    } else {
2423                        return Err(ParseError::expected(
2424                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2425                            self.peek(),
2426                            self.position(),
2427                        ));
2428                    }
2429                    // All accepted modes map to our snapshot engine today.
2430                    let _ = parts;
2431                }
2432                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2433            }
2434            // COMMIT [WORK | TRANSACTION]
2435            Token::Commit => {
2436                self.advance()?;
2437                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2438                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2439            }
2440            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
2441            // ROLLBACK MIGRATION name
2442            Token::Rollback => {
2443                self.advance()?;
2444                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2445                    match self.parse_rollback_migration_after_keyword()? {
2446                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2447                        other => Err(ParseError::new(
2448                            format!(
2449                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2450                            ),
2451                            self.position(),
2452                        )),
2453                    }
2454                } else {
2455                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2456                    if self.consume(&Token::To)? {
2457                        let _ = self.consume(&Token::Savepoint)?;
2458                        let name = self.expect_ident()?;
2459                        Ok(SqlCommand::TransactionControl(
2460                            TxnControl::RollbackToSavepoint(name),
2461                        ))
2462                    } else {
2463                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2464                    }
2465                }
2466            }
2467            // SAVEPOINT name
2468            Token::Savepoint => {
2469                self.advance()?;
2470                let name = self.expect_ident()?;
2471                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2472            }
2473            // RELEASE [SAVEPOINT] name
2474            Token::Release => {
2475                self.advance()?;
2476                let _ = self.consume(&Token::Savepoint)?;
2477                let name = self.expect_ident()?;
2478                Ok(SqlCommand::TransactionControl(
2479                    TxnControl::ReleaseSavepoint(name),
2480                ))
2481            }
2482            // VACUUM [FULL] [table]
2483            Token::Vacuum => {
2484                self.advance()?;
2485                let full = self.consume(&Token::Full)?;
2486                let target = if self.check(&Token::Eof) {
2487                    None
2488                } else {
2489                    Some(self.expect_ident()?)
2490                };
2491                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2492                    target,
2493                    full,
2494                }))
2495            }
2496            // REFRESH MATERIALIZED VIEW name
2497            Token::Refresh => {
2498                self.advance()?;
2499                self.expect(Token::Materialized)?;
2500                self.expect(Token::View)?;
2501                let name = self.expect_ident()?;
2502                Ok(SqlCommand::RefreshMaterializedView(
2503                    RefreshMaterializedViewQuery { name },
2504                ))
2505            }
2506            // ANALYZE [table]
2507            Token::Analyze => {
2508                self.advance()?;
2509                let target = if self.check(&Token::Eof) {
2510                    None
2511                } else {
2512                    Some(self.expect_ident()?)
2513                };
2514                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2515                    target,
2516                }))
2517            }
2518            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
2519            //
2520            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
2521            // short-form `DELIMITER ',' HEADER`. The only supported format
2522            // today is CSV.
2523            Token::Copy => {
2524                self.advance()?;
2525                let table = self.expect_ident()?;
2526                self.expect(Token::From)?;
2527                let path = self.parse_string()?;
2528
2529                let mut delimiter: Option<char> = None;
2530                let mut has_header = false;
2531                let format = CopyFormat::Csv;
2532
2533                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
2534                // `WITH` is a reserved keyword token — accept both the keyword
2535                // form and the ident form that non-CTE callers sometimes emit.
2536                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2537                    self.expect(Token::LParen)?;
2538                    loop {
2539                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2540                            let _ = self.consume(&Token::Eq)?;
2541                            // Only CSV for now — accept the ident and move on.
2542                            let _ = self.expect_ident()?;
2543                        } else if self.consume(&Token::Header)? {
2544                            let _ = self.consume(&Token::Eq)?;
2545                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
2546                            // or an ident spelling of true/false.
2547                            has_header = match self.peek().clone() {
2548                                Token::True => {
2549                                    self.advance()?;
2550                                    true
2551                                }
2552                                Token::False => {
2553                                    self.advance()?;
2554                                    false
2555                                }
2556                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2557                                    self.advance()?;
2558                                    true
2559                                }
2560                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2561                                    self.advance()?;
2562                                    false
2563                                }
2564                                _ => true,
2565                            };
2566                        } else if self.consume(&Token::Delimiter)? {
2567                            let _ = self.consume(&Token::Eq)?;
2568                            let s = self.parse_string()?;
2569                            delimiter = s.chars().next();
2570                        } else {
2571                            break;
2572                        }
2573                        if !self.consume(&Token::Comma)? {
2574                            break;
2575                        }
2576                    }
2577                    self.expect(Token::RParen)?;
2578                }
2579
2580                // Short form clauses outside WITH (in either order).
2581                loop {
2582                    if self.consume(&Token::Delimiter)? {
2583                        let s = self.parse_string()?;
2584                        delimiter = s.chars().next();
2585                    } else if self.consume(&Token::Header)? {
2586                        has_header = true;
2587                    } else {
2588                        break;
2589                    }
2590                }
2591
2592                Ok(SqlCommand::CopyFrom(CopyFromQuery {
2593                    table,
2594                    path,
2595                    format,
2596                    delimiter,
2597                    has_header,
2598                }))
2599            }
2600            other => Err(ParseError::expected(
2601                vec![
2602                    "SELECT",
2603                    "FROM",
2604                    "INSERT",
2605                    "UPDATE",
2606                    "DELETE",
2607                    "EXPLAIN",
2608                    "CREATE",
2609                    "DROP",
2610                    "ALTER",
2611                    "SET",
2612                    "SHOW",
2613                    "BEGIN",
2614                    "COMMIT",
2615                    "ROLLBACK",
2616                    "SAVEPOINT",
2617                    "RELEASE",
2618                    "START",
2619                    "VACUUM",
2620                    "ANALYZE",
2621                    "COPY",
2622                    "REFRESH",
2623                ],
2624                other,
2625                self.position(),
2626            )),
2627        }
2628    }
2629}