Skip to main content

reddb_server/storage/query/
sql.rs

1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3    AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery, AskQuery, BinOp,
4    CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
5    CreateForeignTableQuery, CreateIndexQuery, CreateMigrationQuery, CreatePolicyQuery,
6    CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery, CreateTableQuery,
7    CreateTimeSeriesQuery, CreateTreeQuery, CreateVectorQuery, CreateViewQuery, DeleteQuery,
8    DropCollectionQuery, DropDocumentQuery, DropForeignTableQuery, DropGraphQuery, DropIndexQuery,
9    DropKvQuery, DropPolicyQuery, DropQueueQuery, DropSchemaQuery, DropSequenceQuery,
10    DropServerQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery,
11    DropViewQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef,
12    Filter, ForeignColumnDef, GrantStmt, GraphCommand, GraphQuery, HybridQuery, InsertQuery,
13    JoinQuery, KvCommand, MaintenanceCommand, PathQuery, PolicyAction, ProbabilisticCommand,
14    QueryExpr, QueueCommand, QueueSelectQuery, RefreshMaterializedViewQuery, RevokeStmt,
15    RollbackMigrationQuery, SearchCommand, Span, TableQuery, TreeCommand, TruncateQuery,
16    TxnControl, UpdateQuery, VectorQuery,
17};
18use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
19use crate::storage::query::sql_lowering::filter_to_expr;
20use crate::storage::query::Token;
21use crate::storage::schema::Value;
22
23/// Canonical SQL frontend command surface.
24///
25/// This is the single entrypoint for SQL/RQL-style commands before they are
26/// lowered into the broader multi-backend `QueryExpr` space.
27#[derive(Debug, Clone)]
28pub enum SqlStatement {
29    Query(SqlQuery),
30    Mutation(SqlMutation),
31    Schema(SqlSchemaCommand),
32    Admin(SqlAdminCommand),
33}
34
35#[derive(Debug, Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum FrontendStatement {
38    Sql(SqlStatement),
39    Graph(GraphQuery),
40    GraphCommand(GraphCommand),
41    Path(PathQuery),
42    Vector(VectorQuery),
43    Hybrid(HybridQuery),
44    Search(SearchCommand),
45    Ask(AskQuery),
46    QueueSelect(QueueSelectQuery),
47    QueueCommand(QueueCommand),
48    EventsBackfill(EventsBackfillQuery),
49    EventsBackfillStatus { collection: String },
50    TreeCommand(TreeCommand),
51    ProbabilisticCommand(ProbabilisticCommand),
52    KvCommand(KvCommand),
53    ConfigCommand(ConfigCommand),
54}
55
56#[derive(Debug, Clone)]
57pub enum SqlCommand {
58    Select(TableQuery),
59    Join(JoinQuery),
60    Insert(InsertQuery),
61    Update(UpdateQuery),
62    Delete(DeleteQuery),
63    ExplainAlter(ExplainAlterQuery),
64    CreateTable(CreateTableQuery),
65    CreateCollection(CreateCollectionQuery),
66    CreateVector(CreateVectorQuery),
67    DropTable(DropTableQuery),
68    DropGraph(DropGraphQuery),
69    DropVector(DropVectorQuery),
70    DropDocument(DropDocumentQuery),
71    DropKv(DropKvQuery),
72    DropCollection(DropCollectionQuery),
73    Truncate(TruncateQuery),
74    AlterTable(AlterTableQuery),
75    CreateIndex(CreateIndexQuery),
76    DropIndex(DropIndexQuery),
77    CreateTimeSeries(CreateTimeSeriesQuery),
78    DropTimeSeries(DropTimeSeriesQuery),
79    CreateQueue(CreateQueueQuery),
80    AlterQueue(AlterQueueQuery),
81    DropQueue(DropQueueQuery),
82    CreateTree(CreateTreeQuery),
83    DropTree(DropTreeQuery),
84    Probabilistic(ProbabilisticCommand),
85    SetConfig {
86        key: String,
87        value: Value,
88    },
89    ShowConfig {
90        prefix: Option<String>,
91    },
92    SetSecret {
93        key: String,
94        value: Value,
95    },
96    DeleteSecret {
97        key: String,
98    },
99    ShowSecrets {
100        prefix: Option<String>,
101    },
102    SetTenant(Option<String>),
103    ShowTenant,
104    TransactionControl(TxnControl),
105    Maintenance(MaintenanceCommand),
106    CreateSchema(CreateSchemaQuery),
107    DropSchema(DropSchemaQuery),
108    CreateSequence(CreateSequenceQuery),
109    DropSequence(DropSequenceQuery),
110    CopyFrom(CopyFromQuery),
111    CreateView(CreateViewQuery),
112    DropView(DropViewQuery),
113    RefreshMaterializedView(RefreshMaterializedViewQuery),
114    CreatePolicy(CreatePolicyQuery),
115    DropPolicy(DropPolicyQuery),
116    CreateServer(CreateServerQuery),
117    DropServer(DropServerQuery),
118    CreateForeignTable(CreateForeignTableQuery),
119    DropForeignTable(DropForeignTableQuery),
120    /// `GRANT … ON … TO …`
121    Grant(GrantStmt),
122    /// `REVOKE … ON … FROM …`
123    Revoke(RevokeStmt),
124    /// `ALTER USER name <attrs>`
125    AlterUser(AlterUserStmt),
126    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
127    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
128    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
129    /// can route the multitude of shapes through a single arm.
130    IamPolicy(QueryExpr),
131    CreateMigration(CreateMigrationQuery),
132    ApplyMigration(ApplyMigrationQuery),
133    RollbackMigration(RollbackMigrationQuery),
134    ExplainMigration(ExplainMigrationQuery),
135}
136
137fn collection_model_filter(model: &str) -> Filter {
138    Filter::Compare {
139        field: FieldRef::column("", "model"),
140        op: CompareOp::Eq,
141        value: Value::Text(model.to_string().into()),
142    }
143}
144
145fn add_table_filter(query: &mut TableQuery, filter: Filter) {
146    let combined = match query.filter.take() {
147        Some(existing) => existing.and(filter),
148        None => filter,
149    };
150    query.where_expr = Some(filter_to_expr(&combined));
151    query.filter = Some(combined);
152}
153
154fn parse_show_collections_by_model(
155    parser: &mut Parser<'_>,
156    model: &str,
157) -> Result<TableQuery, ParseError> {
158    let mut query = TableQuery::new("red.collections");
159    parser.parse_table_clauses(&mut query)?;
160    add_table_filter(&mut query, collection_model_filter(model));
161    Ok(query)
162}
163
164#[derive(Debug, Clone)]
165#[allow(clippy::large_enum_variant)]
166pub enum SqlQuery {
167    Select(TableQuery),
168    Join(JoinQuery),
169}
170
171#[derive(Debug, Clone)]
172pub enum SqlMutation {
173    Insert(InsertQuery),
174    Update(UpdateQuery),
175    Delete(DeleteQuery),
176}
177
178#[derive(Debug, Clone)]
179pub enum SqlSchemaCommand {
180    ExplainAlter(ExplainAlterQuery),
181    CreateTable(CreateTableQuery),
182    CreateCollection(CreateCollectionQuery),
183    CreateVector(CreateVectorQuery),
184    DropTable(DropTableQuery),
185    DropGraph(DropGraphQuery),
186    DropVector(DropVectorQuery),
187    DropDocument(DropDocumentQuery),
188    DropKv(DropKvQuery),
189    DropCollection(DropCollectionQuery),
190    Truncate(TruncateQuery),
191    AlterTable(AlterTableQuery),
192    CreateIndex(CreateIndexQuery),
193    DropIndex(DropIndexQuery),
194    CreateTimeSeries(CreateTimeSeriesQuery),
195    DropTimeSeries(DropTimeSeriesQuery),
196    CreateQueue(CreateQueueQuery),
197    AlterQueue(AlterQueueQuery),
198    DropQueue(DropQueueQuery),
199    CreateTree(CreateTreeQuery),
200    DropTree(DropTreeQuery),
201    Probabilistic(ProbabilisticCommand),
202    CreateSchema(CreateSchemaQuery),
203    DropSchema(DropSchemaQuery),
204    CreateSequence(CreateSequenceQuery),
205    DropSequence(DropSequenceQuery),
206    CopyFrom(CopyFromQuery),
207    CreateView(CreateViewQuery),
208    DropView(DropViewQuery),
209    RefreshMaterializedView(RefreshMaterializedViewQuery),
210    CreatePolicy(CreatePolicyQuery),
211    DropPolicy(DropPolicyQuery),
212    CreateServer(CreateServerQuery),
213    DropServer(DropServerQuery),
214    CreateForeignTable(CreateForeignTableQuery),
215    DropForeignTable(DropForeignTableQuery),
216    CreateMigration(CreateMigrationQuery),
217    ApplyMigration(ApplyMigrationQuery),
218    RollbackMigration(RollbackMigrationQuery),
219    ExplainMigration(ExplainMigrationQuery),
220}
221
222#[derive(Debug, Clone)]
223#[allow(clippy::large_enum_variant)]
224pub enum SqlAdminCommand {
225    SetConfig { key: String, value: Value },
226    ShowConfig { prefix: Option<String> },
227    SetSecret { key: String, value: Value },
228    DeleteSecret { key: String },
229    ShowSecrets { prefix: Option<String> },
230    SetTenant(Option<String>),
231    ShowTenant,
232    TransactionControl(TxnControl),
233    Maintenance(MaintenanceCommand),
234    Grant(GrantStmt),
235    Revoke(RevokeStmt),
236    AlterUser(AlterUserStmt),
237    IamPolicy(QueryExpr),
238}
239
240impl SqlStatement {
241    pub fn into_command(self) -> SqlCommand {
242        match self {
243            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
244            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
245            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
246            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
247            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
248            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
249                SqlCommand::ExplainAlter(query)
250            }
251            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
252                SqlCommand::CreateTable(query)
253            }
254            SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
255                SqlCommand::CreateCollection(query)
256            }
257            SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
258                SqlCommand::CreateVector(query)
259            }
260            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
261                SqlCommand::DropTable(query)
262            }
263            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
264                SqlCommand::DropGraph(query)
265            }
266            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
267                SqlCommand::DropVector(query)
268            }
269            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
270                SqlCommand::DropDocument(query)
271            }
272            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
273            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
274                SqlCommand::DropCollection(query)
275            }
276            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
277            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
278                SqlCommand::AlterTable(query)
279            }
280            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
281                SqlCommand::CreateIndex(query)
282            }
283            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
284                SqlCommand::DropIndex(query)
285            }
286            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
287                SqlCommand::CreateTimeSeries(query)
288            }
289            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
290                SqlCommand::DropTimeSeries(query)
291            }
292            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
293                SqlCommand::CreateQueue(query)
294            }
295            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
296                SqlCommand::AlterQueue(query)
297            }
298            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
299                SqlCommand::DropQueue(query)
300            }
301            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
302                SqlCommand::CreateTree(query)
303            }
304            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
305            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
306                SqlCommand::Probabilistic(command)
307            }
308            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
309                SqlCommand::SetConfig { key, value }
310            }
311            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
312                SqlCommand::ShowConfig { prefix }
313            }
314            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
315                SqlCommand::SetSecret { key, value }
316            }
317            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
318                SqlCommand::DeleteSecret { key }
319            }
320            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
321                SqlCommand::ShowSecrets { prefix }
322            }
323            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
324            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
325            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
326                SqlCommand::TransactionControl(ctl)
327            }
328            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
329            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
330            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
331            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
332                SqlCommand::CreateSequence(q)
333            }
334            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
335            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
336            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
337            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
338            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
339                SqlCommand::RefreshMaterializedView(q)
340            }
341            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
342            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
343            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
344            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
345            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
346                SqlCommand::CreateForeignTable(q)
347            }
348            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
349                SqlCommand::DropForeignTable(q)
350            }
351            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
352            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
353            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
354            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
355            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
356                SqlCommand::CreateMigration(q)
357            }
358            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
359                SqlCommand::ApplyMigration(q)
360            }
361            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
362                SqlCommand::RollbackMigration(q)
363            }
364            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
365                SqlCommand::ExplainMigration(q)
366            }
367        }
368    }
369
370    pub fn into_query_expr(self) -> QueryExpr {
371        self.into_command().into_query_expr()
372    }
373}
374
375impl FrontendStatement {
376    pub fn into_query_expr(self) -> QueryExpr {
377        match self {
378            FrontendStatement::Sql(statement) => statement.into_query_expr(),
379            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
380            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
381            FrontendStatement::Path(query) => QueryExpr::Path(query),
382            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
383            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
384            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
385            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
386            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
387            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
388            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
389            FrontendStatement::EventsBackfillStatus { collection } => {
390                QueryExpr::EventsBackfillStatus { collection }
391            }
392            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
393            FrontendStatement::ProbabilisticCommand(command) => {
394                QueryExpr::ProbabilisticCommand(command)
395            }
396            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
397            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
398        }
399    }
400}
401
402pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
403    let mut parser = Parser::new(input)?;
404    let statement = parser.parse_frontend_statement()?;
405    if !parser.check(&Token::Eof) {
406        return Err(ParseError::new(
407            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
408            // Display arms emit raw user bytes. Render via `{:?}` so
409            // embedded CR/LF/NUL/quotes are escaped before the message
410            // reaches downstream JSON / audit / log / gRPC sinks.
411            format!("Unexpected token after query: {:?}", parser.current.token),
412            parser.position(),
413        ));
414    }
415    Ok(statement)
416}
417
418impl SqlCommand {
419    pub fn into_query_expr(self) -> QueryExpr {
420        match self {
421            SqlCommand::Select(query) => QueryExpr::Table(query),
422            SqlCommand::Join(query) => QueryExpr::Join(query),
423            SqlCommand::Insert(query) => QueryExpr::Insert(query),
424            SqlCommand::Update(query) => QueryExpr::Update(query),
425            SqlCommand::Delete(query) => QueryExpr::Delete(query),
426            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
427            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
428            SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
429            SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
430            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
431            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
432            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
433            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
434            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
435            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
436            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
437            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
438            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
439            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
440            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
441            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
442            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
443            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
444            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
445            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
446            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
447            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
448            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
449            SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
450            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
451            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
452            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
453            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
454            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
455            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
456            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
457            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
458            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
459            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
460            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
461            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
462            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
463            SqlCommand::DropView(q) => QueryExpr::DropView(q),
464            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
465            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
466            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
467            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
468            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
469            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
470            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
471            SqlCommand::Grant(s) => QueryExpr::Grant(s),
472            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
473            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
474            SqlCommand::IamPolicy(e) => e,
475            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
476            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
477            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
478            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
479        }
480    }
481
482    pub fn into_statement(self) -> SqlStatement {
483        match self {
484            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
485            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
486            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
487            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
488            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
489            SqlCommand::ExplainAlter(query) => {
490                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
491            }
492            SqlCommand::CreateTable(query) => {
493                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
494            }
495            SqlCommand::CreateCollection(query) => {
496                SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
497            }
498            SqlCommand::CreateVector(query) => {
499                SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
500            }
501            SqlCommand::DropTable(query) => {
502                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
503            }
504            SqlCommand::DropGraph(query) => {
505                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
506            }
507            SqlCommand::DropVector(query) => {
508                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
509            }
510            SqlCommand::DropDocument(query) => {
511                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
512            }
513            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
514            SqlCommand::DropCollection(query) => {
515                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
516            }
517            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
518            SqlCommand::AlterTable(query) => {
519                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
520            }
521            SqlCommand::CreateIndex(query) => {
522                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
523            }
524            SqlCommand::DropIndex(query) => {
525                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
526            }
527            SqlCommand::CreateTimeSeries(query) => {
528                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
529            }
530            SqlCommand::DropTimeSeries(query) => {
531                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
532            }
533            SqlCommand::CreateQueue(query) => {
534                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
535            }
536            SqlCommand::AlterQueue(query) => {
537                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
538            }
539            SqlCommand::DropQueue(query) => {
540                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
541            }
542            SqlCommand::CreateTree(query) => {
543                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
544            }
545            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
546            SqlCommand::Probabilistic(command) => {
547                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
548            }
549            SqlCommand::SetConfig { key, value } => {
550                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
551            }
552            SqlCommand::ShowConfig { prefix } => {
553                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
554            }
555            SqlCommand::SetSecret { key, value } => {
556                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
557            }
558            SqlCommand::DeleteSecret { key } => {
559                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
560            }
561            SqlCommand::ShowSecrets { prefix } => {
562                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
563            }
564            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
565            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
566            SqlCommand::TransactionControl(ctl) => {
567                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
568            }
569            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
570            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
571            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
572            SqlCommand::CreateSequence(q) => {
573                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
574            }
575            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
576            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
577            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
578            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
579            SqlCommand::RefreshMaterializedView(q) => {
580                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
581            }
582            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
583            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
584            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
585            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
586            SqlCommand::CreateForeignTable(q) => {
587                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
588            }
589            SqlCommand::DropForeignTable(q) => {
590                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
591            }
592            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
593            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
594            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
595            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
596            SqlCommand::CreateMigration(q) => {
597                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
598            }
599            SqlCommand::ApplyMigration(q) => {
600                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
601            }
602            SqlCommand::RollbackMigration(q) => {
603                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
604            }
605            SqlCommand::ExplainMigration(q) => {
606                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
607            }
608        }
609    }
610}
611
612impl<'a> Parser<'a> {
613    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
614        self.expect_ident()?; // EVENTS
615        if self.consume_ident_ci("STATUS")? {
616            let mut query = TableQuery::new("red.subscriptions");
617            let collection = match self.peek().clone() {
618                Token::Ident(name) => {
619                    self.advance()?;
620                    Some(name)
621                }
622                Token::String(name) => {
623                    self.advance()?;
624                    Some(name)
625                }
626                _ => None,
627            };
628            self.parse_table_clauses(&mut query)?;
629            if let Some(collection) = collection {
630                let filter = Filter::compare(
631                    FieldRef::column("red.subscriptions", "collection"),
632                    CompareOp::Eq,
633                    Value::text(collection),
634                );
635                let expr = filter_to_expr(&filter);
636                query.where_expr = Some(match query.where_expr.take() {
637                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
638                    None => expr,
639                });
640                query.filter = Some(match query.filter.take() {
641                    Some(existing) => existing.and(filter),
642                    None => filter,
643                });
644            }
645            return Ok(QueryExpr::Table(query));
646        }
647
648        if !self.consume_ident_ci("BACKFILL")? {
649            return Err(ParseError::expected(
650                vec!["BACKFILL", "STATUS"],
651                self.peek(),
652                self.position(),
653            ));
654        }
655
656        if self.consume_ident_ci("STATUS")? {
657            let collection = self.expect_ident()?;
658            return Ok(QueryExpr::EventsBackfillStatus { collection });
659        }
660
661        let collection = self.expect_ident()?;
662        let where_filter = if self.consume(&Token::Where)? {
663            let mut parts = Vec::new();
664            while !self.check(&Token::Eof) && !self.check(&Token::To) {
665                parts.push(self.peek().to_string());
666                self.advance()?;
667            }
668            if parts.is_empty() {
669                return Err(ParseError::expected(
670                    vec!["predicate"],
671                    self.peek(),
672                    self.position(),
673                ));
674            }
675            Some(parts.join(" "))
676        } else {
677            None
678        };
679
680        self.expect(Token::To)?;
681        let target_queue = self.expect_ident()?;
682        let limit = if self.consume(&Token::Limit)? {
683            Some(self.parse_positive_integer("LIMIT")? as u64)
684        } else {
685            None
686        };
687
688        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
689            collection,
690            where_filter,
691            target_queue,
692            limit,
693        }))
694    }
695
696    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
697    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
698    /// clause is absent. Values are always single-quoted string literals —
699    /// consistent with PG's generic-options model.
700    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
701        if !self.consume(&Token::Options)? {
702            return Ok(Vec::new());
703        }
704        self.expect(Token::LParen)?;
705        let mut out: Vec<(String, String)> = Vec::new();
706        loop {
707            // Option keys frequently collide with reserved words
708            // (`path`, `format`, `delimiter`, `header`, …) — accept
709            // the keyword form and lowercase it so downstream
710            // option-name matching stays case-insensitive.
711            let was_ident = matches!(self.peek(), Token::Ident(_));
712            let raw = self.expect_ident_or_keyword()?;
713            let key = if was_ident {
714                raw
715            } else {
716                raw.to_ascii_lowercase()
717            };
718            // Value is a single-quoted string literal.
719            let value = self.parse_string()?;
720            out.push((key, value));
721            if !self.consume(&Token::Comma)? {
722                break;
723            }
724        }
725        self.expect(Token::RParen)?;
726        Ok(out)
727    }
728
729    /// Parse any top-level frontend statement through a single shared surface.
730    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
731        match self.peek() {
732            Token::Select => match self.parse_select_query()? {
733                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
734                    SqlQuery::Select(query),
735                ))),
736                QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
737                    SqlQuery::Join(query),
738                ))),
739                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
740                other => Err(ParseError::new(
741                    format!("internal: SELECT produced unexpected query kind {other:?}"),
742                    self.position(),
743                )),
744            },
745            Token::From
746            | Token::Insert
747            | Token::Update
748            | Token::Truncate
749            | Token::Create
750            | Token::Drop
751            | Token::Alter
752            | Token::Set
753            | Token::Begin
754            | Token::Commit
755            | Token::Rollback
756            | Token::Savepoint
757            | Token::Release
758            | Token::Start
759            | Token::Vacuum
760            | Token::Analyze
761            | Token::Copy
762            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
763            Token::Explain => {
764                if matches!(
765                    self.peek_next()?,
766                    Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
767                ) {
768                    match self.parse_explain_ask_query()? {
769                        QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
770                        other => Err(ParseError::new(
771                            format!(
772                                "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
773                            ),
774                            self.position(),
775                        )),
776                    }
777                } else {
778                    self.parse_sql_statement().map(FrontendStatement::Sql)
779                }
780            }
781            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
782                self.parse_sql_statement().map(FrontendStatement::Sql)
783            }
784            Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
785            Token::Ident(name)
786                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
787            {
788                self.parse_sql_statement().map(FrontendStatement::Sql)
789            }
790            Token::Ident(name)
791                if name.eq_ignore_ascii_case("GRANT")
792                    || name.eq_ignore_ascii_case("REVOKE")
793                    || name.eq_ignore_ascii_case("SIMULATE")
794                    || name.eq_ignore_ascii_case("APPLY") =>
795            {
796                self.parse_sql_statement().map(FrontendStatement::Sql)
797            }
798            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
799                self.advance()?;
800                if matches!(
801                    self.peek(),
802                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
803                ) {
804                    match self.parse_config_watch_after_watch()? {
805                        QueryExpr::ConfigCommand(command) => {
806                            Ok(FrontendStatement::ConfigCommand(command))
807                        }
808                        other => Err(ParseError::new(
809                            format!(
810                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
811                            ),
812                            self.position(),
813                        )),
814                    }
815                } else if matches!(
816                    self.peek(),
817                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
818                ) {
819                    match self.parse_vault_watch_after_watch()? {
820                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
821                        other => Err(ParseError::new(
822                            format!(
823                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
824                            ),
825                            self.position(),
826                        )),
827                    }
828                } else {
829                    match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
830                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
831                        other => Err(ParseError::new(
832                            format!("internal: WATCH produced unexpected query kind {other:?}"),
833                            self.position(),
834                        )),
835                    }
836                }
837            }
838            Token::List => {
839                self.advance()?;
840                if matches!(
841                    self.peek(),
842                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
843                ) {
844                    match self.parse_config_list_after_list()? {
845                        QueryExpr::ConfigCommand(command) => {
846                            Ok(FrontendStatement::ConfigCommand(command))
847                        }
848                        other => Err(ParseError::new(
849                            format!(
850                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
851                            ),
852                            self.position(),
853                        )),
854                    }
855                } else if matches!(
856                    self.peek(),
857                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
858                ) {
859                    match self.parse_vault_list_after_list()? {
860                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
861                        other => Err(ParseError::new(
862                            format!(
863                                "internal: LIST VAULT produced unexpected query kind {other:?}"
864                            ),
865                            self.position(),
866                        )),
867                    }
868                } else {
869                    Err(ParseError::expected(
870                        vec!["CONFIG", "VAULT"],
871                        self.peek(),
872                        self.position(),
873                    ))
874                }
875            }
876            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
877                self.advance()?;
878                if matches!(
879                    self.peek(),
880                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
881                ) {
882                    match self.parse_config_list_after_list()? {
883                        QueryExpr::ConfigCommand(command) => {
884                            Ok(FrontendStatement::ConfigCommand(command))
885                        }
886                        other => Err(ParseError::new(
887                            format!(
888                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
889                            ),
890                            self.position(),
891                        )),
892                    }
893                } else if matches!(
894                    self.peek(),
895                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
896                ) {
897                    match self.parse_vault_list_after_list()? {
898                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
899                        other => Err(ParseError::new(
900                            format!(
901                                "internal: LIST VAULT produced unexpected query kind {other:?}"
902                            ),
903                            self.position(),
904                        )),
905                    }
906                } else {
907                    Err(ParseError::expected(
908                        vec!["CONFIG", "VAULT"],
909                        self.peek(),
910                        self.position(),
911                    ))
912                }
913            }
914            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
915                if matches!(
916                    self.peek_next()?,
917                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
918                ) {
919                    match self.parse_config_command()? {
920                        QueryExpr::ConfigCommand(command) => {
921                            Ok(FrontendStatement::ConfigCommand(command))
922                        }
923                        other => Err(ParseError::new(
924                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
925                            self.position(),
926                        )),
927                    }
928                } else {
929                    self.advance()?;
930                    match self.parse_kv_invalidate_tags_after_invalidate()? {
931                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
932                        other => Err(ParseError::new(
933                            format!(
934                                "internal: INVALIDATE produced unexpected query kind {other:?}"
935                            ),
936                            self.position(),
937                        )),
938                    }
939                }
940            }
941            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
942            Token::Match => match self.parse_match_query()? {
943                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
944                other => Err(ParseError::new(
945                    format!("internal: MATCH produced unexpected query kind {other:?}"),
946                    self.position(),
947                )),
948            },
949            Token::Path => match self.parse_path_query()? {
950                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
951                other => Err(ParseError::new(
952                    format!("internal: PATH produced unexpected query kind {other:?}"),
953                    self.position(),
954                )),
955            },
956            Token::Vector => match self.parse_vector_query()? {
957                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
958                other => Err(ParseError::new(
959                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
960                    self.position(),
961                )),
962            },
963            Token::Hybrid => match self.parse_hybrid_query()? {
964                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
965                other => Err(ParseError::new(
966                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
967                    self.position(),
968                )),
969            },
970            Token::Graph => match self.parse_graph_command()? {
971                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
972                other => Err(ParseError::new(
973                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
974                    self.position(),
975                )),
976            },
977            Token::Search => match self.parse_search_command()? {
978                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
979                other => Err(ParseError::new(
980                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
981                    self.position(),
982                )),
983            },
984            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
985                match self.parse_ask_query()? {
986                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
987                    other => Err(ParseError::new(
988                        format!("internal: ASK produced unexpected query kind {other:?}"),
989                        self.position(),
990                    )),
991                }
992            }
993            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
994                match self.parse_unseal_vault_command()? {
995                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
996                    other => Err(ParseError::new(
997                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
998                        self.position(),
999                    )),
1000                }
1001            }
1002            Token::Queue => match self.parse_queue_command()? {
1003                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
1004                other => Err(ParseError::new(
1005                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
1006                    self.position(),
1007                )),
1008            },
1009            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1010                match self.parse_events_command()? {
1011                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1012                        SqlQuery::Select(query),
1013                    ))),
1014                    QueryExpr::EventsBackfill(query) => {
1015                        Ok(FrontendStatement::EventsBackfill(query))
1016                    }
1017                    QueryExpr::EventsBackfillStatus { collection } => {
1018                        Ok(FrontendStatement::EventsBackfillStatus { collection })
1019                    }
1020                    other => Err(ParseError::new(
1021                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
1022                        self.position(),
1023                    )),
1024                }
1025            }
1026            Token::Kv => match self.parse_kv_command()? {
1027                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1028                other => Err(ParseError::new(
1029                    format!("internal: KV produced unexpected query kind {other:?}"),
1030                    self.position(),
1031                )),
1032            },
1033            Token::Delete => {
1034                if matches!(
1035                    self.peek_next()?,
1036                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1037                ) {
1038                    match self.parse_config_command()? {
1039                        QueryExpr::ConfigCommand(command) => {
1040                            Ok(FrontendStatement::ConfigCommand(command))
1041                        }
1042                        other => Err(ParseError::new(
1043                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1044                            self.position(),
1045                        )),
1046                    }
1047                } else if matches!(
1048                    self.peek_next()?,
1049                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1050                ) {
1051                    match self.parse_vault_lifecycle_command()? {
1052                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1053                        other => Err(ParseError::new(
1054                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1055                            self.position(),
1056                        )),
1057                    }
1058                } else {
1059                    self.parse_sql_statement().map(FrontendStatement::Sql)
1060                }
1061            }
1062            Token::Add => match self.parse_config_command()? {
1063                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1064                other => Err(ParseError::new(
1065                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
1066                    self.position(),
1067                )),
1068            },
1069            Token::Purge => match self.parse_vault_lifecycle_command()? {
1070                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1071                other => Err(ParseError::new(
1072                    format!("internal: VAULT produced unexpected query kind {other:?}"),
1073                    self.position(),
1074                )),
1075            },
1076            Token::Ident(name)
1077                if name.eq_ignore_ascii_case("PUT")
1078                    || name.eq_ignore_ascii_case("GET")
1079                    || name.eq_ignore_ascii_case("RESOLVE")
1080                    || name.eq_ignore_ascii_case("ROTATE")
1081                    || name.eq_ignore_ascii_case("HISTORY")
1082                    || name.eq_ignore_ascii_case("PURGE")
1083                    || name.eq_ignore_ascii_case("INCR")
1084                    || name.eq_ignore_ascii_case("DECR")
1085                    || name.eq_ignore_ascii_case("INVALIDATE") =>
1086            {
1087                if matches!(
1088                    self.peek_next()?,
1089                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1090                ) {
1091                    match self.parse_vault_lifecycle_command()? {
1092                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1093                        other => Err(ParseError::new(
1094                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1095                            self.position(),
1096                        )),
1097                    }
1098                } else {
1099                    match self.parse_config_command()? {
1100                        QueryExpr::ConfigCommand(command) => {
1101                            Ok(FrontendStatement::ConfigCommand(command))
1102                        }
1103                        other => Err(ParseError::new(
1104                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1105                            self.position(),
1106                        )),
1107                    }
1108                }
1109            }
1110            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1111                match self.parse_vault_command()? {
1112                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1113                    other => Err(ParseError::new(
1114                        format!("internal: VAULT produced unexpected query kind {other:?}"),
1115                        self.position(),
1116                    )),
1117                }
1118            }
1119            Token::Tree => match self.parse_tree_command()? {
1120                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1121                other => Err(ParseError::new(
1122                    format!("internal: TREE produced unexpected query kind {other:?}"),
1123                    self.position(),
1124                )),
1125            },
1126            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1127                match self.parse_hll_command()? {
1128                    QueryExpr::ProbabilisticCommand(command) => {
1129                        Ok(FrontendStatement::ProbabilisticCommand(command))
1130                    }
1131                    other => Err(ParseError::new(
1132                        format!("internal: HLL produced unexpected query kind {other:?}"),
1133                        self.position(),
1134                    )),
1135                }
1136            }
1137            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1138                match self.parse_sketch_command()? {
1139                    QueryExpr::ProbabilisticCommand(command) => {
1140                        Ok(FrontendStatement::ProbabilisticCommand(command))
1141                    }
1142                    other => Err(ParseError::new(
1143                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
1144                        self.position(),
1145                    )),
1146                }
1147            }
1148            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1149                match self.parse_filter_command()? {
1150                    QueryExpr::ProbabilisticCommand(command) => {
1151                        Ok(FrontendStatement::ProbabilisticCommand(command))
1152                    }
1153                    other => Err(ParseError::new(
1154                        format!("internal: FILTER produced unexpected query kind {other:?}"),
1155                        self.position(),
1156                    )),
1157                }
1158            }
1159            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1160                .parse_sql_command()
1161                .map(SqlCommand::into_statement)
1162                .map(FrontendStatement::Sql),
1163            other => Err(ParseError::expected(
1164                vec![
1165                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1166                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1167                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1168                    "DESCRIBE", "DESC",
1169                ],
1170                other,
1171                self.position(),
1172            )),
1173        }
1174    }
1175
1176    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
1177    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1178        self.parse_sql_command().map(SqlCommand::into_statement)
1179    }
1180
1181    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1182        let mut path = self.expect_ident()?;
1183        while self.consume(&Token::Dot)? {
1184            let next = self.expect_ident_or_keyword()?;
1185            path = format!("{path}.{next}");
1186        }
1187        Ok(if lowercase {
1188            path.to_ascii_lowercase()
1189        } else {
1190            path
1191        })
1192    }
1193
1194    /// Parse any SQL/RQL-style command through a single frontend module.
1195    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1196        match self.peek() {
1197            Token::Select => match self.parse_select_query()? {
1198                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1199                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1200                other => Err(ParseError::new(
1201                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1202                    self.position(),
1203                )),
1204            },
1205            Token::From => match self.parse_from_query()? {
1206                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1207                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1208                other => Err(ParseError::new(
1209                    format!("internal: FROM produced unexpected query kind {other:?}"),
1210                    self.position(),
1211                )),
1212            },
1213            Token::Insert => match self.parse_insert_query()? {
1214                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1215                other => Err(ParseError::new(
1216                    format!("internal: INSERT produced unexpected query kind {other:?}"),
1217                    self.position(),
1218                )),
1219            },
1220            Token::Update => match self.parse_update_query()? {
1221                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1222                other => Err(ParseError::new(
1223                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
1224                    self.position(),
1225                )),
1226            },
1227            Token::Delete => {
1228                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1229                {
1230                    self.advance()?; // DELETE
1231                    self.advance()?; // SECRET
1232                    let key = self.parse_dotted_admin_path(true)?;
1233                    Ok(SqlCommand::DeleteSecret { key })
1234                } else {
1235                    match self.parse_delete_query()? {
1236                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1237                        other => Err(ParseError::new(
1238                            format!("internal: DELETE produced unexpected query kind {other:?}"),
1239                            self.position(),
1240                        )),
1241                    }
1242                }
1243            }
1244            Token::Truncate => {
1245                self.advance()?;
1246                let model = if self.consume(&Token::Table)? {
1247                    Some(CollectionModel::Table)
1248                } else if self.consume(&Token::Graph)? {
1249                    Some(CollectionModel::Graph)
1250                } else if self.consume(&Token::Vector)? {
1251                    Some(CollectionModel::Vector)
1252                } else if self.consume(&Token::Document)? {
1253                    Some(CollectionModel::Document)
1254                } else if self.consume(&Token::Timeseries)? {
1255                    Some(CollectionModel::TimeSeries)
1256                } else if self.consume_ident_ci("METRICS")? {
1257                    Some(CollectionModel::Metrics)
1258                } else if self.consume(&Token::Kv)? {
1259                    Some(CollectionModel::Kv)
1260                } else if self.consume(&Token::Queue)? {
1261                    Some(CollectionModel::Queue)
1262                } else if self.consume(&Token::Collection)? {
1263                    None
1264                } else {
1265                    return Err(ParseError::expected(
1266                        vec![
1267                            "TABLE",
1268                            "GRAPH",
1269                            "VECTOR",
1270                            "DOCUMENT",
1271                            "TIMESERIES",
1272                            "METRICS",
1273                            "KV",
1274                            "QUEUE",
1275                            "COLLECTION",
1276                        ],
1277                        self.peek(),
1278                        self.position(),
1279                    ));
1280                };
1281                match self.parse_truncate_body(model)? {
1282                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1283                    other => Err(ParseError::new(
1284                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1285                        self.position(),
1286                    )),
1287                }
1288            }
1289            Token::Explain => {
1290                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
1291                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
1292                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1293                {
1294                    self.advance()?; // consume EXPLAIN
1295                    match self.parse_explain_migration_after_keyword()? {
1296                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
1297                        other => Err(ParseError::new(
1298                            format!(
1299                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
1300                            ),
1301                            self.position(),
1302                        )),
1303                    }
1304                } else {
1305                    match self.parse_explain_alter_query()? {
1306                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
1307                        other => Err(ParseError::new(
1308                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
1309                            self.position(),
1310                        )),
1311                    }
1312                }
1313            }
1314            Token::Create => {
1315                let pos = self.position();
1316                self.advance()?;
1317
1318                // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
1319                // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
1320                // don't collide with other CREATE variants (TABLE, INDEX, etc.).
1321                let mut or_replace = false;
1322                if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1323                    let _ = self.consume_ident_ci("REPLACE")?;
1324                    or_replace = true;
1325                }
1326                let materialized = self.consume(&Token::Materialized)?;
1327                if self.check(&Token::View) {
1328                    self.advance()?;
1329                    let if_not_exists = self.match_if_not_exists()?;
1330                    let name = self.expect_ident()?;
1331                    // Issue #584 slice 12 — `WITH RETENTION <duration>`
1332                    // on CREATE MATERIALIZED VIEW. Parsed before `AS`
1333                    // so the SELECT body parser cannot consume the
1334                    // trailing `WITH` for its own (TTL / METADATA /
1335                    // …) clauses. Persisted on the view definition;
1336                    // the physical sweep against view-backing rows
1337                    // activates with the slice-9 row-storage follow-up.
1338                    let mut retention_duration_ms: Option<u64> = None;
1339                    if self.check(&Token::With) {
1340                        self.advance()?;
1341                        if !self.consume(&Token::Retention)?
1342                            && !self.consume_ident_ci("RETENTION")?
1343                        {
1344                            return Err(ParseError::expected(
1345                                vec!["RETENTION"],
1346                                self.peek(),
1347                                self.position(),
1348                            ));
1349                        }
1350                        if !materialized {
1351                            return Err(ParseError::new(
1352                                "WITH RETENTION is only valid on \
1353                                 CREATE MATERIALIZED VIEW"
1354                                    .to_string(),
1355                                self.position(),
1356                            ));
1357                        }
1358                        let value = self.parse_float()?;
1359                        let unit_mult = self.parse_duration_unit()?;
1360                        retention_duration_ms = Some((value * unit_mult).round() as u64);
1361                    }
1362                    // Accept `AS` — the lexer promotes it to `Token::As`
1363                    // (keyword) but some paths still see it as an ident.
1364                    if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1365                        return Err(ParseError::expected(
1366                            vec!["AS"],
1367                            self.peek(),
1368                            self.position(),
1369                        ));
1370                    }
1371                    // Recursive parse of the body. Any QueryExpr that the
1372                    // rest of the grammar accepts is valid (Select, Join, etc.).
1373                    let body = self.parse_sql_command()?.into_query_expr();
1374                    // Optional `REFRESH EVERY <duration>` clause on
1375                    // materialized views (issue #583 slice 10). The
1376                    // background scheduler reads this off the view
1377                    // descriptor and ticks the view on its cadence.
1378                    let mut refresh_every_ms: Option<u64> = None;
1379                    if self.check(&Token::Refresh) {
1380                        if !materialized {
1381                            return Err(ParseError::new(
1382                                "REFRESH EVERY is only valid on \
1383                                 CREATE MATERIALIZED VIEW"
1384                                    .to_string(),
1385                                self.position(),
1386                            ));
1387                        }
1388                        self.advance()?;
1389                        if !self.consume_ident_ci("EVERY")? {
1390                            return Err(ParseError::expected(
1391                                vec!["EVERY"],
1392                                self.peek(),
1393                                self.position(),
1394                            ));
1395                        }
1396                        let value = self.parse_float()?;
1397                        let unit_mult = self.parse_duration_unit()?;
1398                        refresh_every_ms = Some((value * unit_mult).round() as u64);
1399                    }
1400                    return Ok(SqlCommand::CreateView(CreateViewQuery {
1401                        name,
1402                        query: Box::new(body),
1403                        materialized,
1404                        if_not_exists,
1405                        or_replace,
1406                        refresh_every_ms,
1407                        retention_duration_ms,
1408                    }));
1409                }
1410                // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
1411                // bail out — no other CREATE form accepts those modifiers.
1412                if or_replace || materialized {
1413                    return Err(ParseError::expected(
1414                        vec!["VIEW"],
1415                        self.peek(),
1416                        self.position(),
1417                    ));
1418                }
1419
1420                if self.check(&Token::Index) || self.check(&Token::Unique) {
1421                    match self.parse_create_index_query()? {
1422                        QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1423                        other => Err(ParseError::new(
1424                            format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1425                            self.position(),
1426                        )),
1427                    }
1428                } else if self.check(&Token::Table) {
1429                    self.expect(Token::Table)?;
1430                    match self.parse_create_table_body()? {
1431                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1432                        other => Err(ParseError::new(
1433                            format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1434                            self.position(),
1435                        )),
1436                    }
1437                } else if self.check(&Token::Graph) {
1438                    self.advance()?;
1439                    match self.parse_create_collection_model_body(CollectionModel::Graph)? {
1440                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1441                        other => Err(ParseError::new(
1442                            format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
1443                            self.position(),
1444                        )),
1445                    }
1446                } else if self.check(&Token::Document) {
1447                    self.advance()?;
1448                    match self.parse_create_collection_model_body(CollectionModel::Document)? {
1449                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1450                        other => Err(ParseError::new(
1451                            format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
1452                            self.position(),
1453                        )),
1454                    }
1455                } else if self.check(&Token::Vector) {
1456                    self.advance()?;
1457                    match self.parse_create_vector_body()? {
1458                        QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
1459                        other => Err(ParseError::new(
1460                            format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
1461                            self.position(),
1462                        )),
1463                    }
1464                } else if self.check(&Token::Collection) {
1465                    self.advance()?;
1466                    match self.parse_create_collection_body()? {
1467                        QueryExpr::CreateCollection(query) => {
1468                            Ok(SqlCommand::CreateCollection(query))
1469                        }
1470                        other => Err(ParseError::new(
1471                            format!(
1472                                "internal: CREATE COLLECTION produced unexpected kind {other:?}"
1473                            ),
1474                            self.position(),
1475                        )),
1476                    }
1477                } else if self.check(&Token::Kv) {
1478                    self.advance()?;
1479                    match self.parse_create_keyed_body(CollectionModel::Kv)? {
1480                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1481                        other => Err(ParseError::new(
1482                            format!("internal: CREATE KV produced unexpected kind {other:?}"),
1483                            self.position(),
1484                        )),
1485                    }
1486                } else if self.consume_ident_ci("CONFIG")? {
1487                    match self.parse_create_keyed_body(CollectionModel::Config)? {
1488                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1489                        other => Err(ParseError::new(
1490                            format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1491                            self.position(),
1492                        )),
1493                    }
1494                } else if self.consume_ident_ci("VAULT")? {
1495                    match self.parse_create_keyed_body(CollectionModel::Vault)? {
1496                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1497                        other => Err(ParseError::new(
1498                            format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1499                            self.position(),
1500                        )),
1501                    }
1502                } else if self.check(&Token::Timeseries) {
1503                    self.advance()?;
1504                    match self.parse_create_timeseries_body()? {
1505                        QueryExpr::CreateTimeSeries(query) => {
1506                            Ok(SqlCommand::CreateTimeSeries(query))
1507                        }
1508                        other => Err(ParseError::new(
1509                            format!(
1510                                "internal: CREATE TIMESERIES produced unexpected kind {other:?}"
1511                            ),
1512                            self.position(),
1513                        )),
1514                    }
1515                } else if self.consume_ident_ci("METRICS")? {
1516                    match self.parse_create_metrics_body()? {
1517                        QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1518                        other => Err(ParseError::new(
1519                            format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
1520                            self.position(),
1521                        )),
1522                    }
1523                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1524                {
1525                    self.advance()?;
1526                    match self.parse_create_hypertable_body()? {
1527                        QueryExpr::CreateTimeSeries(query) => {
1528                            Ok(SqlCommand::CreateTimeSeries(query))
1529                        }
1530                        other => Err(ParseError::new(
1531                            format!(
1532                                "internal: CREATE HYPERTABLE produced unexpected kind {other:?}"
1533                            ),
1534                            self.position(),
1535                        )),
1536                    }
1537                } else if self.check(&Token::Queue) {
1538                    self.advance()?;
1539                    match self.parse_create_queue_body()? {
1540                        QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1541                        other => Err(ParseError::new(
1542                            format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1543                            self.position(),
1544                        )),
1545                    }
1546                } else if self.check(&Token::Tree) {
1547                    self.advance()?;
1548                    match self.parse_create_tree_body()? {
1549                        QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1550                        other => Err(ParseError::new(
1551                            format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1552                            self.position(),
1553                        )),
1554                    }
1555                } else if matches!(self.peek(), Token::Ident(n) if
1556                    n.eq_ignore_ascii_case("HLL") ||
1557                    n.eq_ignore_ascii_case("SKETCH") ||
1558                    n.eq_ignore_ascii_case("FILTER"))
1559                {
1560                    match self.parse_create_probabilistic()? {
1561                        QueryExpr::ProbabilisticCommand(command) => {
1562                            Ok(SqlCommand::Probabilistic(command))
1563                        }
1564                        other => Err(ParseError::new(
1565                            format!(
1566                                "internal: CREATE probabilistic produced unexpected kind {other:?}"
1567                            ),
1568                            self.position(),
1569                        )),
1570                    }
1571                } else if self.check(&Token::Schema) {
1572                    // CREATE SCHEMA [IF NOT EXISTS] name
1573                    self.advance()?;
1574                    let if_not_exists = self.match_if_not_exists()?;
1575                    let name = self.expect_ident()?;
1576                    Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1577                        name,
1578                        if_not_exists,
1579                    }))
1580                } else if self.check(&Token::Policy) {
1581                    // Two forms share the leading `CREATE POLICY` tokens:
1582                    //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
1583                    //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
1584                    // Disambiguate by peeking the token after POLICY.
1585                    self.advance()?;
1586                    if matches!(self.peek(), Token::String(_)) {
1587                        // IAM form — short-circuit out of the SQL command stack.
1588                        let expr = self.parse_create_iam_policy_after_keywords()?;
1589                        // Inline command-wrapping: produce a synthetic SqlCommand by
1590                        // routing through a generic IAM admin holder. We don't
1591                        // have a dedicated SqlCommand variant for IAM yet, so we
1592                        // bounce through the existing Grant-shaped Admin slot
1593                        // which expects no further tokens.
1594                        return Ok(SqlCommand::IamPolicy(expr));
1595                    }
1596                    let name = self.expect_ident()?;
1597                    self.expect(Token::On)?;
1598
1599                    let (target_kind, table) = {
1600                        use crate::storage::query::ast::PolicyTargetKind;
1601                        let kw = match self.peek() {
1602                            Token::Ident(s) => Some(s.to_ascii_uppercase()),
1603                            _ => None,
1604                        };
1605                        let kind = kw.as_deref().and_then(|k| match k {
1606                            "NODES" => Some(PolicyTargetKind::Nodes),
1607                            "EDGES" => Some(PolicyTargetKind::Edges),
1608                            "VECTORS" => Some(PolicyTargetKind::Vectors),
1609                            "MESSAGES" => Some(PolicyTargetKind::Messages),
1610                            "POINTS" => Some(PolicyTargetKind::Points),
1611                            "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1612                            _ => None,
1613                        });
1614                        if let Some(k) = kind {
1615                            self.advance()?;
1616                            self.expect(Token::Of)?;
1617                            let coll = self.expect_ident()?;
1618                            (k, coll)
1619                        } else {
1620                            let coll = self.expect_ident()?;
1621                            (PolicyTargetKind::Table, coll)
1622                        }
1623                    };
1624
1625                    let action = if self.consume(&Token::For)? {
1626                        let a = match self.peek() {
1627                            Token::Select => {
1628                                self.advance()?;
1629                                Some(PolicyAction::Select)
1630                            }
1631                            Token::Insert => {
1632                                self.advance()?;
1633                                Some(PolicyAction::Insert)
1634                            }
1635                            Token::Update => {
1636                                self.advance()?;
1637                                Some(PolicyAction::Update)
1638                            }
1639                            Token::Delete => {
1640                                self.advance()?;
1641                                Some(PolicyAction::Delete)
1642                            }
1643                            Token::All => {
1644                                self.advance()?;
1645                                None
1646                            }
1647                            _ => None,
1648                        };
1649                        a
1650                    } else {
1651                        None
1652                    };
1653
1654                    let role = if self.consume(&Token::To)? {
1655                        Some(self.expect_ident()?)
1656                    } else {
1657                        None
1658                    };
1659
1660                    self.expect(Token::Using)?;
1661                    self.expect(Token::LParen)?;
1662                    let filter = self.parse_filter()?;
1663                    self.expect(Token::RParen)?;
1664
1665                    Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1666                        name,
1667                        table,
1668                        action,
1669                        role,
1670                        using: Box::new(filter),
1671                        target_kind,
1672                    }))
1673                } else if self.check(&Token::Server) {
1674                    // CREATE SERVER [IF NOT EXISTS] name
1675                    //   FOREIGN DATA WRAPPER kind
1676                    //   [OPTIONS (key 'value', ...)]
1677                    self.advance()?;
1678                    let if_not_exists = self.match_if_not_exists()?;
1679                    let name = self.expect_ident()?;
1680                    self.expect(Token::Foreign)?;
1681                    self.expect(Token::Data)?;
1682                    self.expect(Token::Wrapper)?;
1683                    let wrapper = self.expect_ident()?;
1684                    let options = self.parse_fdw_options_clause()?;
1685                    Ok(SqlCommand::CreateServer(CreateServerQuery {
1686                        name,
1687                        wrapper,
1688                        options,
1689                        if_not_exists,
1690                    }))
1691                } else if self.check(&Token::Foreign) {
1692                    // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
1693                    //   SERVER server_name
1694                    //   [OPTIONS (key 'value', ...)]
1695                    self.advance()?;
1696                    self.expect(Token::Table)?;
1697                    let if_not_exists = self.match_if_not_exists()?;
1698                    let name = self.expect_ident()?;
1699                    self.expect(Token::LParen)?;
1700                    let mut columns = Vec::new();
1701                    loop {
1702                        let col_name = self.expect_ident()?;
1703                        let data_type = self.expect_ident_or_keyword()?;
1704                        // Inline NOT NULL check — the CREATE TABLE path's helper is
1705                        // private and coupling to it just for FDW columns isn't worth it.
1706                        let mut not_null = false;
1707                        if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1708                            self.advance()?;
1709                            if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL"))
1710                            {
1711                                self.advance()?;
1712                                not_null = true;
1713                            }
1714                        }
1715                        columns.push(ForeignColumnDef {
1716                            name: col_name,
1717                            data_type,
1718                            not_null,
1719                        });
1720                        if !self.consume(&Token::Comma)? {
1721                            break;
1722                        }
1723                    }
1724                    self.expect(Token::RParen)?;
1725                    self.expect(Token::Server)?;
1726                    let server = self.expect_ident()?;
1727                    let options = self.parse_fdw_options_clause()?;
1728                    Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1729                        name,
1730                        server,
1731                        columns,
1732                        options,
1733                        if_not_exists,
1734                    }))
1735                } else if self.check(&Token::Sequence) {
1736                    // CREATE SEQUENCE [IF NOT EXISTS] name
1737                    //   [START [WITH] n] [INCREMENT [BY] n]
1738                    self.advance()?;
1739                    let if_not_exists = self.match_if_not_exists()?;
1740                    let name = self.expect_ident()?;
1741                    let mut start: i64 = 1;
1742                    let mut increment: i64 = 1;
1743                    // Loop over optional clauses in any order.
1744                    loop {
1745                        if self.consume(&Token::Start)? {
1746                            // Accept `START 100` or `START WITH 100`.
1747                            let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1748                            start = self.parse_integer()?;
1749                        } else if self.consume(&Token::Increment)? {
1750                            // Accept `INCREMENT 5` or `INCREMENT BY 5`.
1751                            let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1752                            increment = self.parse_integer()?;
1753                        } else {
1754                            break;
1755                        }
1756                    }
1757                    Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1758                        name,
1759                        if_not_exists,
1760                        start,
1761                        increment,
1762                    }))
1763                } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1764                {
1765                    self.advance()?; // consume MIGRATION
1766                    match self.parse_create_migration_body()? {
1767                        QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1768                        other => Err(ParseError::new(
1769                            format!(
1770                                "internal: CREATE MIGRATION produced unexpected kind {other:?}"
1771                            ),
1772                            self.position(),
1773                        )),
1774                    }
1775                } else if let Some(err) =
1776                    ParseError::unsupported_recognized_token(self.peek(), self.position())
1777                {
1778                    Err(err)
1779                } else {
1780                    Err(ParseError::expected(
1781                        vec![
1782                            "TABLE",
1783                            "GRAPH",
1784                            "VECTOR",
1785                            "DOCUMENT",
1786                            "KV",
1787                            "COLLECTION",
1788                            "INDEX",
1789                            "UNIQUE",
1790                            "TIMESERIES",
1791                            "QUEUE",
1792                            "TREE",
1793                            "HLL",
1794                            "SKETCH",
1795                            "FILTER",
1796                            "SCHEMA",
1797                            "SEQUENCE",
1798                            "MIGRATION",
1799                        ],
1800                        self.peek(),
1801                        pos,
1802                    ))
1803                }
1804            }
1805            Token::Drop => {
1806                let pos = self.position();
1807                self.advance()?;
1808
1809                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
1810                let materialized = self.consume(&Token::Materialized)?;
1811                if self.check(&Token::View) {
1812                    self.advance()?;
1813                    let if_exists = self.match_if_exists()?;
1814                    let name = self.expect_ident()?;
1815                    return Ok(SqlCommand::DropView(DropViewQuery {
1816                        name,
1817                        materialized,
1818                        if_exists,
1819                    }));
1820                }
1821                if materialized {
1822                    return Err(ParseError::expected(
1823                        vec!["VIEW"],
1824                        self.peek(),
1825                        self.position(),
1826                    ));
1827                }
1828
1829                if self.check(&Token::Index) {
1830                    match self.parse_drop_index_query()? {
1831                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
1832                        other => Err(ParseError::new(
1833                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
1834                            self.position(),
1835                        )),
1836                    }
1837                } else if self.check(&Token::Table) {
1838                    self.expect(Token::Table)?;
1839                    match self.parse_drop_table_body()? {
1840                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
1841                        other => Err(ParseError::new(
1842                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
1843                            self.position(),
1844                        )),
1845                    }
1846                } else if self.check(&Token::Graph) {
1847                    self.advance()?;
1848                    match self.parse_drop_graph_body()? {
1849                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
1850                        other => Err(ParseError::new(
1851                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
1852                            self.position(),
1853                        )),
1854                    }
1855                } else if self.check(&Token::Vector) {
1856                    self.advance()?;
1857                    match self.parse_drop_vector_body()? {
1858                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
1859                        other => Err(ParseError::new(
1860                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
1861                            self.position(),
1862                        )),
1863                    }
1864                } else if self.check(&Token::Document) {
1865                    self.advance()?;
1866                    match self.parse_drop_document_body()? {
1867                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
1868                        other => Err(ParseError::new(
1869                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
1870                            self.position(),
1871                        )),
1872                    }
1873                } else if self.check(&Token::Kv) {
1874                    self.advance()?;
1875                    match self.parse_drop_kv_body()? {
1876                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1877                        other => Err(ParseError::new(
1878                            format!("internal: DROP KV produced unexpected kind {other:?}"),
1879                            self.position(),
1880                        )),
1881                    }
1882                } else if self.consume_ident_ci("CONFIG")? {
1883                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
1884                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1885                        other => Err(ParseError::new(
1886                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
1887                            self.position(),
1888                        )),
1889                    }
1890                } else if self.consume_ident_ci("VAULT")? {
1891                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
1892                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1893                        other => Err(ParseError::new(
1894                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
1895                            self.position(),
1896                        )),
1897                    }
1898                } else if self.check(&Token::Collection) {
1899                    self.advance()?;
1900                    match self.parse_drop_collection_body()? {
1901                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1902                        other => Err(ParseError::new(
1903                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
1904                            self.position(),
1905                        )),
1906                    }
1907                } else if self.check(&Token::Timeseries) {
1908                    self.advance()?;
1909                    match self.parse_drop_timeseries_body()? {
1910                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1911                        other => Err(ParseError::new(
1912                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
1913                            self.position(),
1914                        )),
1915                    }
1916                } else if self.consume_ident_ci("METRICS")? {
1917                    match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
1918                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1919                        other => Err(ParseError::new(
1920                            format!("internal: DROP METRICS produced unexpected kind {other:?}"),
1921                            self.position(),
1922                        )),
1923                    }
1924                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1925                {
1926                    // DROP HYPERTABLE name reuses the same AST as
1927                    // DROP TIMESERIES — runtime clears the registry
1928                    // entry *and* drops the backing collection.
1929                    self.advance()?;
1930                    match self.parse_drop_timeseries_body()? {
1931                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1932                        other => Err(ParseError::new(
1933                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
1934                            self.position(),
1935                        )),
1936                    }
1937                } else if self.check(&Token::Queue) {
1938                    self.advance()?;
1939                    match self.parse_drop_queue_body()? {
1940                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
1941                        other => Err(ParseError::new(
1942                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
1943                            self.position(),
1944                        )),
1945                    }
1946                } else if self.check(&Token::Tree) {
1947                    self.advance()?;
1948                    match self.parse_drop_tree_body()? {
1949                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
1950                        other => Err(ParseError::new(
1951                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
1952                            self.position(),
1953                        )),
1954                    }
1955                } else if matches!(self.peek(), Token::Ident(n) if
1956                    n.eq_ignore_ascii_case("HLL") ||
1957                    n.eq_ignore_ascii_case("SKETCH") ||
1958                    n.eq_ignore_ascii_case("FILTER"))
1959                {
1960                    match self.parse_drop_probabilistic()? {
1961                        QueryExpr::ProbabilisticCommand(command) => {
1962                            Ok(SqlCommand::Probabilistic(command))
1963                        }
1964                        other => Err(ParseError::new(
1965                            format!(
1966                                "internal: DROP probabilistic produced unexpected kind {other:?}"
1967                            ),
1968                            self.position(),
1969                        )),
1970                    }
1971                } else if self.check(&Token::Schema) {
1972                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
1973                    self.advance()?;
1974                    let if_exists = self.match_if_exists()?;
1975                    let name = self.expect_ident()?;
1976                    let cascade = self.consume(&Token::Cascade)?;
1977                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
1978                        name,
1979                        if_exists,
1980                        cascade,
1981                    }))
1982                } else if self.check(&Token::Policy) {
1983                    // Two forms:
1984                    //   * IAM:   DROP POLICY '<id>'
1985                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
1986                    self.advance()?;
1987                    if matches!(self.peek(), Token::String(_)) {
1988                        let expr = self.parse_drop_iam_policy_after_keywords()?;
1989                        return Ok(SqlCommand::IamPolicy(expr));
1990                    }
1991                    let if_exists = self.match_if_exists()?;
1992                    let name = self.expect_ident()?;
1993                    self.expect(Token::On)?;
1994                    let table = self.expect_ident()?;
1995                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
1996                        name,
1997                        table,
1998                        if_exists,
1999                    }))
2000                } else if self.check(&Token::Server) {
2001                    // DROP SERVER [IF EXISTS] name [CASCADE]
2002                    self.advance()?;
2003                    let if_exists = self.match_if_exists()?;
2004                    let name = self.expect_ident()?;
2005                    let cascade = self.consume(&Token::Cascade)?;
2006                    Ok(SqlCommand::DropServer(DropServerQuery {
2007                        name,
2008                        if_exists,
2009                        cascade,
2010                    }))
2011                } else if self.check(&Token::Foreign) {
2012                    // DROP FOREIGN TABLE [IF EXISTS] name
2013                    self.advance()?;
2014                    self.expect(Token::Table)?;
2015                    let if_exists = self.match_if_exists()?;
2016                    let name = self.expect_ident()?;
2017                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
2018                        name,
2019                        if_exists,
2020                    }))
2021                } else if self.check(&Token::Sequence) {
2022                    // DROP SEQUENCE [IF EXISTS] name
2023                    self.advance()?;
2024                    let if_exists = self.match_if_exists()?;
2025                    let name = self.expect_ident()?;
2026                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
2027                        name,
2028                        if_exists,
2029                    }))
2030                } else if let Some(err) =
2031                    ParseError::unsupported_recognized_token(self.peek(), self.position())
2032                {
2033                    Err(err)
2034                } else {
2035                    Err(ParseError::expected(
2036                        vec![
2037                            "TABLE",
2038                            "INDEX",
2039                            "TIMESERIES",
2040                            "QUEUE",
2041                            "TREE",
2042                            "HLL",
2043                            "SKETCH",
2044                            "FILTER",
2045                            "SCHEMA",
2046                            "SEQUENCE",
2047                        ],
2048                        self.peek(),
2049                        pos,
2050                    ))
2051                }
2052            }
2053            Token::Alter => {
2054                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
2055                // committing to a path until we've seen the target.
2056                // We peek the *next* token (without consuming) and
2057                // dispatch accordingly.
2058                let next = self.peek_next()?.clone();
2059                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
2060                    self.advance()?; // consume ALTER
2061                    let stmt = self.parse_alter_user_statement()?;
2062                    Ok(SqlCommand::AlterUser(stmt))
2063                } else if matches!(next, Token::Queue) {
2064                    self.advance()?; // consume ALTER
2065                    self.advance()?; // consume QUEUE
2066                    match self.parse_alter_queue_body()? {
2067                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
2068                        other => Err(ParseError::new(
2069                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
2070                            self.position(),
2071                        )),
2072                    }
2073                } else if matches!(next, Token::Table)
2074                    || matches!(next, Token::Collection)
2075                    || matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("COLLECTION"))
2076                {
2077                    // Issue #522 — `ALTER COLLECTION` shares the AlterTable
2078                    // AST so signer-registry mutations dispatch through the
2079                    // existing executor. The DDL parser body accepts either
2080                    // keyword interchangeably for the open-vocabulary alters
2081                    // we own (currently `ADD|REVOKE SIGNER`).
2082                    match self.parse_alter_table_query()? {
2083                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2084                        other => Err(ParseError::new(
2085                            format!(
2086                                "internal: ALTER TABLE produced unexpected query kind {other:?}"
2087                            ),
2088                            self.position(),
2089                        )),
2090                    }
2091                } else if let Some(err) =
2092                    ParseError::unsupported_recognized_token(&next, self.position())
2093                {
2094                    Err(err)
2095                } else {
2096                    match self.parse_alter_table_query()? {
2097                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2098                        other => Err(ParseError::new(
2099                            format!("internal: ALTER produced unexpected query kind {other:?}"),
2100                            self.position(),
2101                        )),
2102                    }
2103                }
2104            }
2105            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
2106                let stmt = self.parse_grant_statement()?;
2107                Ok(SqlCommand::Grant(stmt))
2108            }
2109            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
2110                let stmt = self.parse_revoke_statement()?;
2111                Ok(SqlCommand::Revoke(stmt))
2112            }
2113            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2114                self.advance()?;
2115                if self.consume_ident_ci("BACKFILL")? {
2116                    return Err(ParseError::new(
2117                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
2118                            .to_string(),
2119                        self.position(),
2120                    ));
2121                }
2122                if !self.consume_ident_ci("STATUS")? {
2123                    return Err(ParseError::expected(
2124                        vec!["STATUS"],
2125                        self.peek(),
2126                        self.position(),
2127                    ));
2128                }
2129
2130                let mut query = TableQuery::new("red.subscriptions");
2131                let collection = match self.peek().clone() {
2132                    Token::Ident(name) => {
2133                        self.advance()?;
2134                        Some(name)
2135                    }
2136                    Token::String(name) => {
2137                        self.advance()?;
2138                        Some(name)
2139                    }
2140                    _ => None,
2141                };
2142                self.parse_table_clauses(&mut query)?;
2143                if let Some(collection) = collection {
2144                    let filter = Filter::compare(
2145                        FieldRef::column("red.subscriptions", "collection"),
2146                        CompareOp::Eq,
2147                        Value::text(collection),
2148                    );
2149                    let expr = filter_to_expr(&filter);
2150                    query.where_expr = Some(match query.where_expr.take() {
2151                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
2152                        None => expr,
2153                    });
2154                    query.filter = Some(match query.filter.take() {
2155                        Some(existing) => existing.and(filter),
2156                        None => filter,
2157                    });
2158                }
2159                Ok(SqlCommand::Select(query))
2160            }
2161            Token::Attach => {
2162                let expr = self.parse_attach_policy()?;
2163                Ok(SqlCommand::IamPolicy(expr))
2164            }
2165            Token::Detach => {
2166                let expr = self.parse_detach_policy()?;
2167                Ok(SqlCommand::IamPolicy(expr))
2168            }
2169            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
2170                let expr = self.parse_simulate_policy()?;
2171                Ok(SqlCommand::IamPolicy(expr))
2172            }
2173            Token::Set => {
2174                self.advance()?;
2175                if self.consume_ident_ci("CONFIG")? {
2176                    let full_key = self.parse_dotted_admin_path(true)?;
2177                    self.expect(Token::Eq)?;
2178                    let value = self.parse_literal_value()?;
2179                    Ok(SqlCommand::SetConfig {
2180                        key: full_key,
2181                        value,
2182                    })
2183                } else if self.consume_ident_ci("SECRET")? {
2184                    let key = self.parse_dotted_admin_path(true)?;
2185                    self.expect(Token::Eq)?;
2186                    let value = self.parse_literal_value()?;
2187                    Ok(SqlCommand::SetSecret { key, value })
2188                } else if self.consume_ident_ci("TENANT")? {
2189                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
2190                    // SET TENANT NULL  |  SET TENANT = NULL
2191                    let _ = self.consume(&Token::Eq)?;
2192                    if self.consume_ident_ci("NULL")? {
2193                        Ok(SqlCommand::SetTenant(None))
2194                    } else {
2195                        let value = self.parse_literal_value()?;
2196                        match value {
2197                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2198                            Value::Null => Ok(SqlCommand::SetTenant(None)),
2199                            other => Err(ParseError::new(
2200                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2201                                self.position(),
2202                            )),
2203                        }
2204                    }
2205                } else {
2206                    Err(ParseError::expected(
2207                        vec!["CONFIG", "SECRET", "TENANT"],
2208                        self.peek(),
2209                        self.position(),
2210                    ))
2211                }
2212            }
2213            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2214                self.advance()?;
2215                match self.parse_apply_migration()? {
2216                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2217                    other => Err(ParseError::new(
2218                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2219                        self.position(),
2220                    )),
2221                }
2222            }
2223            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2224                // RESET TENANT — session-local clear
2225                self.advance()?;
2226                if self.consume_ident_ci("TENANT")? {
2227                    Ok(SqlCommand::SetTenant(None))
2228                } else {
2229                    Err(ParseError::expected(
2230                        vec!["TENANT"],
2231                        self.peek(),
2232                        self.position(),
2233                    ))
2234                }
2235            }
2236            Token::Ident(name)
2237                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
2238            {
2239                self.advance()?;
2240                let collection = self.parse_dotted_admin_path(false)?;
2241                let mut query = TableQuery::new("red.describe");
2242                query.filter = Some(Filter::compare(
2243                    FieldRef::column("", "collection"),
2244                    CompareOp::Eq,
2245                    Value::text(collection),
2246                ));
2247                Ok(SqlCommand::Select(query))
2248            }
2249            Token::Desc => {
2250                self.advance()?;
2251                let collection = self.parse_dotted_admin_path(false)?;
2252                let mut query = TableQuery::new("red.describe");
2253                query.filter = Some(Filter::compare(
2254                    FieldRef::column("", "collection"),
2255                    CompareOp::Eq,
2256                    Value::text(collection),
2257                ));
2258                Ok(SqlCommand::Select(query))
2259            }
2260            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2261                self.advance()?;
2262                if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
2263                    if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
2264                        return Err(ParseError::expected(
2265                            vec!["TABLE"],
2266                            self.peek(),
2267                            self.position(),
2268                        ));
2269                    }
2270                    let collection = self.parse_dotted_admin_path(false)?;
2271                    let mut query = TableQuery::new("red.show_create");
2272                    query.filter = Some(Filter::compare(
2273                        FieldRef::column("", "collection"),
2274                        CompareOp::Eq,
2275                        Value::text(collection),
2276                    ));
2277                    Ok(SqlCommand::Select(query))
2278                } else if self.consume_ident_ci("CONFIG")? {
2279                    // Accept dotted prefixes the same way SET CONFIG does
2280                    // (`SHOW CONFIG durability.mode`), and empty prefix
2281                    // (`SHOW CONFIG`) for a catalog-wide listing.
2282                    let prefix = if !self.check(&Token::Eof) {
2283                        let first = self.expect_ident()?;
2284                        let mut full = first;
2285                        while self.consume(&Token::Dot)? {
2286                            let next = self.expect_ident_or_keyword()?;
2287                            full = format!("{full}.{next}");
2288                        }
2289                        // Match SET CONFIG: lowercase so keyword segments
2290                        // come out consistent with the stored keys.
2291                        Some(full.to_ascii_lowercase())
2292                    } else {
2293                        None
2294                    };
2295                    Ok(SqlCommand::ShowConfig { prefix })
2296                } else if self.consume_ident_ci("COLLECTIONS")? {
2297                    let mut query = TableQuery::new("red.collections");
2298                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2299                        if !self.consume_ident_ci("INTERNAL")? {
2300                            return Err(ParseError::expected(
2301                                vec!["INTERNAL"],
2302                                self.peek(),
2303                                self.position(),
2304                            ));
2305                        }
2306                        true
2307                    } else {
2308                        false
2309                    };
2310                    self.parse_table_clauses(&mut query)?;
2311                    if !include_internal {
2312                        let user_filter = query.filter.take();
2313                        let hide_internal = crate::storage::query::ast::Filter::Compare {
2314                            field: FieldRef::column("", "internal"),
2315                            op: CompareOp::Eq,
2316                            value: Value::Boolean(false),
2317                        };
2318                        query.filter = Some(match user_filter {
2319                            Some(filter) => filter.and(hide_internal),
2320                            None => hide_internal,
2321                        });
2322                    }
2323                    Ok(SqlCommand::Select(query))
2324                } else if self.consume_ident_ci("TABLES")? {
2325                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2326                        self, "table",
2327                    )?))
2328                } else if self.consume_ident_ci("QUEUES")? {
2329                    // Issue #535 — `SHOW QUEUES` desugars to the
2330                    // `red.queues` virtual table (queue-shaped
2331                    // columns), not the filtered `red.collections`
2332                    // view. `INCLUDING INTERNAL` mirrors the
2333                    // `SHOW COLLECTIONS` opt-in: without it, DLQ
2334                    // targets and other auto-created queues are
2335                    // hidden via the `internal = false` filter.
2336                    let mut query = TableQuery::new("red.queues");
2337                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2338                        if !self.consume_ident_ci("INTERNAL")? {
2339                            return Err(ParseError::expected(
2340                                vec!["INTERNAL"],
2341                                self.peek(),
2342                                self.position(),
2343                            ));
2344                        }
2345                        true
2346                    } else {
2347                        false
2348                    };
2349                    self.parse_table_clauses(&mut query)?;
2350                    if !include_internal {
2351                        let hide_internal = Filter::Compare {
2352                            field: FieldRef::column("", "internal"),
2353                            op: CompareOp::Eq,
2354                            value: Value::Boolean(false),
2355                        };
2356                        add_table_filter(&mut query, hide_internal);
2357                    }
2358                    Ok(SqlCommand::Select(query))
2359                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2360                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2361                        self, "vector",
2362                    )?))
2363                } else if self.consume_ident_ci("DOCUMENTS")? {
2364                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2365                        self, "document",
2366                    )?))
2367                } else if self.consume(&Token::Timeseries)?
2368                    || self.consume_ident_ci("TIMESERIES")?
2369                {
2370                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2371                        self,
2372                        "timeseries",
2373                    )?))
2374                } else if self.consume_ident_ci("GRAPHS")? {
2375                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2376                        self, "graph",
2377                    )?))
2378                } else if self.consume_ident_ci("CONFIGS")? {
2379                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2380                        self, "config",
2381                    )?))
2382                } else if self.consume_ident_ci("VAULTS")? {
2383                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2384                        self, "vault",
2385                    )?))
2386                } else if self.consume(&Token::Kv)?
2387                    || self.consume_ident_ci("KV")?
2388                    || self.consume_ident_ci("KVS")?
2389                {
2390                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2391                        self, "kv",
2392                    )?))
2393                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2394                    let collection = self.parse_dotted_admin_path(false)?;
2395                    let mut query = TableQuery::new("red.columns");
2396                    query.filter = Some(Filter::compare(
2397                        FieldRef::column("", "collection"),
2398                        CompareOp::Eq,
2399                        Value::text(collection),
2400                    ));
2401                    Ok(SqlCommand::Select(query))
2402                } else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
2403                    let mut query = TableQuery::new("red.show_indexes");
2404                    if self.consume(&Token::On)? {
2405                        let collection = self.expect_ident_or_keyword()?;
2406                        let filter = Filter::Compare {
2407                            field: FieldRef::column("", "table"),
2408                            op: CompareOp::Eq,
2409                            value: Value::text(collection),
2410                        };
2411                        query.where_expr = Some(filter_to_expr(&filter));
2412                        query.filter = Some(filter);
2413                    }
2414                    self.parse_table_clauses(&mut query)?;
2415                    Ok(SqlCommand::Select(query))
2416                } else if self.consume_ident_ci("POLICIES")? {
2417                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2418                        let principal = self.parse_iam_principal_kind()?;
2419                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2420                            filter: Some(principal),
2421                        }));
2422                    }
2423                    let mut query = TableQuery::new("red.policies");
2424                    let collection_filter =
2425                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2426                            let collection = self.parse_dotted_admin_path(false)?;
2427                            Some(Filter::Compare {
2428                                field: FieldRef::TableColumn {
2429                                    table: String::new(),
2430                                    column: "collection".to_string(),
2431                                },
2432                                op: CompareOp::Eq,
2433                                value: Value::text(collection),
2434                            })
2435                        } else {
2436                            None
2437                        };
2438                    self.parse_table_clauses(&mut query)?;
2439                    if let Some(collection_filter) = collection_filter {
2440                        let combined = match query.filter.take() {
2441                            Some(existing) => {
2442                                Filter::And(Box::new(collection_filter), Box::new(existing))
2443                            }
2444                            None => collection_filter,
2445                        };
2446                        query.where_expr = Some(filter_to_expr(&combined));
2447                        query.filter = Some(combined);
2448                    }
2449                    Ok(SqlCommand::Select(query))
2450                } else if self.consume_ident_ci("STATS")? {
2451                    let mut query = TableQuery::new("red.stats");
2452                    let collection = match self.peek().clone() {
2453                        Token::Ident(name) => {
2454                            self.advance()?;
2455                            Some(name)
2456                        }
2457                        Token::String(name) => {
2458                            self.advance()?;
2459                            Some(name)
2460                        }
2461                        _ => None,
2462                    };
2463                    self.parse_table_clauses(&mut query)?;
2464                    if let Some(collection) = collection {
2465                        let filter = Filter::compare(
2466                            FieldRef::column("red.stats", "collection"),
2467                            CompareOp::Eq,
2468                            Value::text(collection),
2469                        );
2470                        let expr = filter_to_expr(&filter);
2471                        query.where_expr = Some(match query.where_expr.take() {
2472                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
2473                            None => expr,
2474                        });
2475                        query.filter = Some(match query.filter.take() {
2476                            Some(existing) => existing.and(filter),
2477                            None => filter,
2478                        });
2479                    }
2480                    Ok(SqlCommand::Select(query))
2481                } else if self.consume_ident_ci("SAMPLE")? {
2482                    let mut query = TableQuery::new(&self.expect_ident()?);
2483                    query.limit = if self.consume(&Token::Limit)? {
2484                        Some(self.parse_integer()? as u64)
2485                    } else {
2486                        Some(10)
2487                    };
2488                    Ok(SqlCommand::Select(query))
2489                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2490                    let prefix = if !self.check(&Token::Eof) {
2491                        Some(self.parse_dotted_admin_path(true)?)
2492                    } else {
2493                        None
2494                    };
2495                    Ok(SqlCommand::ShowSecrets { prefix })
2496                } else if self.consume_ident_ci("TENANT")? {
2497                    Ok(SqlCommand::ShowTenant)
2498                } else if let Some(expr) = self.parse_show_iam_after_show()? {
2499                    Ok(SqlCommand::IamPolicy(expr))
2500                } else {
2501                    Err(ParseError::expected(
2502                        vec![
2503                            "CONFIG",
2504                            "SECRET",
2505                            "SECRETS",
2506                            "COLLECTIONS",
2507                            "TABLES",
2508                            "QUEUES",
2509                            "VECTORS",
2510                            "DOCUMENTS",
2511                            "TIMESERIES",
2512                            "GRAPHS",
2513                            "KV",
2514                            "SCHEMA",
2515                            "INDICES",
2516                            "INDEXES",
2517                            "SAMPLE",
2518                            "POLICIES",
2519                            "STATS",
2520                            "TENANT",
2521                            "EFFECTIVE",
2522                        ],
2523                        self.peek(),
2524                        self.position(),
2525                    ))
2526                }
2527            }
2528            // Transaction control statements (Phase 1.1 PG parity).
2529            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
2530            // START TRANSACTION [ISOLATION LEVEL <mode>]
2531            //
2532            // We only implement SNAPSHOT ISOLATION (our default). We
2533            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2534            // READ / SNAPSHOT as PG-compatible no-ops, but reject
2535            // SERIALIZABLE outright — the previous behaviour of
2536            // silently degrading to snapshot made the parser
2537            // dishonest. Real SSI (Serializable Snapshot Isolation)
2538            // is tracked as a future milestone.
2539            Token::Begin | Token::Start => {
2540                self.advance()?;
2541                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2542                // Optional ISOLATION LEVEL clause.
2543                if self.consume_ident_ci("ISOLATION")? {
2544                    self.expect(Token::Level)?;
2545                    // The level identifier can span multiple words
2546                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2547                    // READ). Collect them case-insensitively.
2548                    let mut parts: Vec<String> = Vec::new();
2549                    if self.consume_ident_ci("READ")? {
2550                        parts.push("READ".to_string());
2551                        if self.consume_ident_ci("UNCOMMITTED")? {
2552                            parts.push("UNCOMMITTED".to_string());
2553                        } else if self.consume_ident_ci("COMMITTED")? {
2554                            parts.push("COMMITTED".to_string());
2555                        } else {
2556                            return Err(ParseError::expected(
2557                                vec!["UNCOMMITTED", "COMMITTED"],
2558                                self.peek(),
2559                                self.position(),
2560                            ));
2561                        }
2562                    } else if self.consume_ident_ci("REPEATABLE")? {
2563                        parts.push("REPEATABLE".to_string());
2564                        if !self.consume_ident_ci("READ")? {
2565                            return Err(ParseError::expected(
2566                                vec!["READ"],
2567                                self.peek(),
2568                                self.position(),
2569                            ));
2570                        }
2571                        parts.push("READ".to_string());
2572                    } else if self.consume_ident_ci("SNAPSHOT")? {
2573                        parts.push("SNAPSHOT".to_string());
2574                    } else if self.consume_ident_ci("SERIALIZABLE")? {
2575                        return Err(ParseError::new(
2576                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2577                             currently provides SNAPSHOT ISOLATION (which PG calls \
2578                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2579                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
2580                                .to_string(),
2581                            self.position(),
2582                        ));
2583                    } else {
2584                        return Err(ParseError::expected(
2585                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2586                            self.peek(),
2587                            self.position(),
2588                        ));
2589                    }
2590                    // All accepted modes map to our snapshot engine today.
2591                    let _ = parts;
2592                }
2593                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2594            }
2595            // COMMIT [WORK | TRANSACTION]
2596            Token::Commit => {
2597                self.advance()?;
2598                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2599                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2600            }
2601            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
2602            // ROLLBACK MIGRATION name
2603            Token::Rollback => {
2604                self.advance()?;
2605                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2606                    match self.parse_rollback_migration_after_keyword()? {
2607                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2608                        other => Err(ParseError::new(
2609                            format!(
2610                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2611                            ),
2612                            self.position(),
2613                        )),
2614                    }
2615                } else {
2616                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2617                    if self.consume(&Token::To)? {
2618                        let _ = self.consume(&Token::Savepoint)?;
2619                        let name = self.expect_ident()?;
2620                        Ok(SqlCommand::TransactionControl(
2621                            TxnControl::RollbackToSavepoint(name),
2622                        ))
2623                    } else {
2624                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2625                    }
2626                }
2627            }
2628            // SAVEPOINT name
2629            Token::Savepoint => {
2630                self.advance()?;
2631                let name = self.expect_ident()?;
2632                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2633            }
2634            // RELEASE [SAVEPOINT] name
2635            Token::Release => {
2636                self.advance()?;
2637                let _ = self.consume(&Token::Savepoint)?;
2638                let name = self.expect_ident()?;
2639                Ok(SqlCommand::TransactionControl(
2640                    TxnControl::ReleaseSavepoint(name),
2641                ))
2642            }
2643            // VACUUM [FULL] [table]
2644            Token::Vacuum => {
2645                self.advance()?;
2646                let full = self.consume(&Token::Full)?;
2647                let target = if self.check(&Token::Eof) {
2648                    None
2649                } else {
2650                    Some(self.expect_ident()?)
2651                };
2652                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2653                    target,
2654                    full,
2655                }))
2656            }
2657            // REFRESH MATERIALIZED VIEW name
2658            Token::Refresh => {
2659                self.advance()?;
2660                self.expect(Token::Materialized)?;
2661                self.expect(Token::View)?;
2662                let name = self.expect_ident()?;
2663                Ok(SqlCommand::RefreshMaterializedView(
2664                    RefreshMaterializedViewQuery { name },
2665                ))
2666            }
2667            // ANALYZE [table]
2668            Token::Analyze => {
2669                self.advance()?;
2670                let target = if self.check(&Token::Eof) {
2671                    None
2672                } else {
2673                    Some(self.expect_ident()?)
2674                };
2675                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2676                    target,
2677                }))
2678            }
2679            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
2680            //
2681            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
2682            // short-form `DELIMITER ',' HEADER`. The only supported format
2683            // today is CSV.
2684            Token::Copy => {
2685                self.advance()?;
2686                let table = self.expect_ident()?;
2687                self.expect(Token::From)?;
2688                let path = self.parse_string()?;
2689
2690                let mut delimiter: Option<char> = None;
2691                let mut has_header = false;
2692                let format = CopyFormat::Csv;
2693
2694                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
2695                // `WITH` is a reserved keyword token — accept both the keyword
2696                // form and the ident form that non-CTE callers sometimes emit.
2697                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2698                    self.expect(Token::LParen)?;
2699                    loop {
2700                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2701                            let _ = self.consume(&Token::Eq)?;
2702                            // Only CSV for now — accept the ident and move on.
2703                            let _ = self.expect_ident()?;
2704                        } else if self.consume(&Token::Header)? {
2705                            let _ = self.consume(&Token::Eq)?;
2706                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
2707                            // or an ident spelling of true/false.
2708                            has_header = match self.peek().clone() {
2709                                Token::True => {
2710                                    self.advance()?;
2711                                    true
2712                                }
2713                                Token::False => {
2714                                    self.advance()?;
2715                                    false
2716                                }
2717                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2718                                    self.advance()?;
2719                                    true
2720                                }
2721                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2722                                    self.advance()?;
2723                                    false
2724                                }
2725                                _ => true,
2726                            };
2727                        } else if self.consume(&Token::Delimiter)? {
2728                            let _ = self.consume(&Token::Eq)?;
2729                            let s = self.parse_string()?;
2730                            delimiter = s.chars().next();
2731                        } else {
2732                            break;
2733                        }
2734                        if !self.consume(&Token::Comma)? {
2735                            break;
2736                        }
2737                    }
2738                    self.expect(Token::RParen)?;
2739                }
2740
2741                // Short form clauses outside WITH (in either order).
2742                loop {
2743                    if self.consume(&Token::Delimiter)? {
2744                        let s = self.parse_string()?;
2745                        delimiter = s.chars().next();
2746                    } else if self.consume(&Token::Header)? {
2747                        has_header = true;
2748                    } else {
2749                        break;
2750                    }
2751                }
2752
2753                Ok(SqlCommand::CopyFrom(CopyFromQuery {
2754                    table,
2755                    path,
2756                    format,
2757                    delimiter,
2758                    has_header,
2759                }))
2760            }
2761            other => Err(ParseError::expected(
2762                vec![
2763                    "SELECT",
2764                    "FROM",
2765                    "INSERT",
2766                    "UPDATE",
2767                    "DELETE",
2768                    "EXPLAIN",
2769                    "CREATE",
2770                    "DROP",
2771                    "ALTER",
2772                    "SET",
2773                    "SHOW",
2774                    "BEGIN",
2775                    "COMMIT",
2776                    "ROLLBACK",
2777                    "SAVEPOINT",
2778                    "RELEASE",
2779                    "START",
2780                    "VACUUM",
2781                    "ANALYZE",
2782                    "COPY",
2783                    "REFRESH",
2784                    "DESCRIBE",
2785                    "DESC",
2786                ],
2787                other,
2788                self.position(),
2789            )),
2790        }
2791    }
2792}