Skip to main content

reddb_server/storage/query/
sql.rs

1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3    AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery, AskQuery, BinOp,
4    CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
5    CreateForeignTableQuery, CreateIndexQuery, CreateMigrationQuery, CreatePolicyQuery,
6    CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery, CreateTableQuery,
7    CreateTimeSeriesQuery, CreateTreeQuery, CreateVectorQuery, CreateViewQuery, DeleteQuery,
8    DropCollectionQuery, DropDocumentQuery, DropForeignTableQuery, DropGraphQuery, DropIndexQuery,
9    DropKvQuery, DropPolicyQuery, DropQueueQuery, DropSchemaQuery, DropSequenceQuery,
10    DropServerQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery,
11    DropViewQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef,
12    Filter, ForeignColumnDef, GrantStmt, GraphCommand, GraphQuery, HybridQuery, InsertQuery,
13    JoinQuery, KvCommand, MaintenanceCommand, PathQuery, PolicyAction, ProbabilisticCommand,
14    QueryExpr, QueueCommand, QueueSelectQuery, RefreshMaterializedViewQuery, RevokeStmt,
15    RollbackMigrationQuery, SearchCommand, Span, TableQuery, TreeCommand, TruncateQuery,
16    TxnControl, UpdateQuery, VectorQuery,
17};
18use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
19use crate::storage::query::sql_lowering::filter_to_expr;
20use crate::storage::query::Token;
21use crate::storage::schema::Value;
22
23/// Canonical SQL frontend command surface.
24///
25/// This is the single entrypoint for SQL/RQL-style commands before they are
26/// lowered into the broader multi-backend `QueryExpr` space.
27#[derive(Debug, Clone)]
28pub enum SqlStatement {
29    Query(SqlQuery),
30    Mutation(SqlMutation),
31    Schema(SqlSchemaCommand),
32    Admin(SqlAdminCommand),
33}
34
35#[derive(Debug, Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum FrontendStatement {
38    Sql(SqlStatement),
39    Graph(GraphQuery),
40    GraphCommand(GraphCommand),
41    Path(PathQuery),
42    Vector(VectorQuery),
43    Hybrid(HybridQuery),
44    Search(SearchCommand),
45    Ask(AskQuery),
46    QueueSelect(QueueSelectQuery),
47    QueueCommand(QueueCommand),
48    EventsBackfill(EventsBackfillQuery),
49    EventsBackfillStatus { collection: String },
50    TreeCommand(TreeCommand),
51    ProbabilisticCommand(ProbabilisticCommand),
52    KvCommand(KvCommand),
53    ConfigCommand(ConfigCommand),
54}
55
56#[derive(Debug, Clone)]
57pub enum SqlCommand {
58    Select(TableQuery),
59    Join(JoinQuery),
60    Insert(InsertQuery),
61    Update(UpdateQuery),
62    Delete(DeleteQuery),
63    ExplainAlter(ExplainAlterQuery),
64    CreateTable(CreateTableQuery),
65    CreateCollection(CreateCollectionQuery),
66    CreateVector(CreateVectorQuery),
67    DropTable(DropTableQuery),
68    DropGraph(DropGraphQuery),
69    DropVector(DropVectorQuery),
70    DropDocument(DropDocumentQuery),
71    DropKv(DropKvQuery),
72    DropCollection(DropCollectionQuery),
73    Truncate(TruncateQuery),
74    AlterTable(AlterTableQuery),
75    CreateIndex(CreateIndexQuery),
76    DropIndex(DropIndexQuery),
77    CreateTimeSeries(CreateTimeSeriesQuery),
78    DropTimeSeries(DropTimeSeriesQuery),
79    CreateQueue(CreateQueueQuery),
80    AlterQueue(AlterQueueQuery),
81    DropQueue(DropQueueQuery),
82    CreateTree(CreateTreeQuery),
83    DropTree(DropTreeQuery),
84    Probabilistic(ProbabilisticCommand),
85    SetConfig {
86        key: String,
87        value: Value,
88    },
89    ShowConfig {
90        prefix: Option<String>,
91    },
92    SetSecret {
93        key: String,
94        value: Value,
95    },
96    DeleteSecret {
97        key: String,
98    },
99    ShowSecrets {
100        prefix: Option<String>,
101    },
102    SetTenant(Option<String>),
103    ShowTenant,
104    TransactionControl(TxnControl),
105    Maintenance(MaintenanceCommand),
106    CreateSchema(CreateSchemaQuery),
107    DropSchema(DropSchemaQuery),
108    CreateSequence(CreateSequenceQuery),
109    DropSequence(DropSequenceQuery),
110    CopyFrom(CopyFromQuery),
111    CreateView(CreateViewQuery),
112    DropView(DropViewQuery),
113    RefreshMaterializedView(RefreshMaterializedViewQuery),
114    CreatePolicy(CreatePolicyQuery),
115    DropPolicy(DropPolicyQuery),
116    CreateServer(CreateServerQuery),
117    DropServer(DropServerQuery),
118    CreateForeignTable(CreateForeignTableQuery),
119    DropForeignTable(DropForeignTableQuery),
120    /// `GRANT … ON … TO …`
121    Grant(GrantStmt),
122    /// `REVOKE … ON … FROM …`
123    Revoke(RevokeStmt),
124    /// `ALTER USER name <attrs>`
125    AlterUser(AlterUserStmt),
126    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
127    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
128    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
129    /// can route the multitude of shapes through a single arm.
130    IamPolicy(QueryExpr),
131    CreateMigration(CreateMigrationQuery),
132    ApplyMigration(ApplyMigrationQuery),
133    RollbackMigration(RollbackMigrationQuery),
134    ExplainMigration(ExplainMigrationQuery),
135}
136
137fn collection_model_filter(model: &str) -> Filter {
138    Filter::Compare {
139        field: FieldRef::column("", "model"),
140        op: CompareOp::Eq,
141        value: Value::Text(model.to_string().into()),
142    }
143}
144
145fn add_table_filter(query: &mut TableQuery, filter: Filter) {
146    let combined = match query.filter.take() {
147        Some(existing) => existing.and(filter),
148        None => filter,
149    };
150    query.where_expr = Some(filter_to_expr(&combined));
151    query.filter = Some(combined);
152}
153
154fn parse_show_collections_by_model(
155    parser: &mut Parser<'_>,
156    model: &str,
157) -> Result<TableQuery, ParseError> {
158    let mut query = TableQuery::new("red.collections");
159    parser.parse_table_clauses(&mut query)?;
160    add_table_filter(&mut query, collection_model_filter(model));
161    Ok(query)
162}
163
164#[derive(Debug, Clone)]
165#[allow(clippy::large_enum_variant)]
166pub enum SqlQuery {
167    Select(TableQuery),
168    Join(JoinQuery),
169}
170
171#[derive(Debug, Clone)]
172pub enum SqlMutation {
173    Insert(InsertQuery),
174    Update(UpdateQuery),
175    Delete(DeleteQuery),
176}
177
178#[derive(Debug, Clone)]
179pub enum SqlSchemaCommand {
180    ExplainAlter(ExplainAlterQuery),
181    CreateTable(CreateTableQuery),
182    CreateCollection(CreateCollectionQuery),
183    CreateVector(CreateVectorQuery),
184    DropTable(DropTableQuery),
185    DropGraph(DropGraphQuery),
186    DropVector(DropVectorQuery),
187    DropDocument(DropDocumentQuery),
188    DropKv(DropKvQuery),
189    DropCollection(DropCollectionQuery),
190    Truncate(TruncateQuery),
191    AlterTable(AlterTableQuery),
192    CreateIndex(CreateIndexQuery),
193    DropIndex(DropIndexQuery),
194    CreateTimeSeries(CreateTimeSeriesQuery),
195    DropTimeSeries(DropTimeSeriesQuery),
196    CreateQueue(CreateQueueQuery),
197    AlterQueue(AlterQueueQuery),
198    DropQueue(DropQueueQuery),
199    CreateTree(CreateTreeQuery),
200    DropTree(DropTreeQuery),
201    Probabilistic(ProbabilisticCommand),
202    CreateSchema(CreateSchemaQuery),
203    DropSchema(DropSchemaQuery),
204    CreateSequence(CreateSequenceQuery),
205    DropSequence(DropSequenceQuery),
206    CopyFrom(CopyFromQuery),
207    CreateView(CreateViewQuery),
208    DropView(DropViewQuery),
209    RefreshMaterializedView(RefreshMaterializedViewQuery),
210    CreatePolicy(CreatePolicyQuery),
211    DropPolicy(DropPolicyQuery),
212    CreateServer(CreateServerQuery),
213    DropServer(DropServerQuery),
214    CreateForeignTable(CreateForeignTableQuery),
215    DropForeignTable(DropForeignTableQuery),
216    CreateMigration(CreateMigrationQuery),
217    ApplyMigration(ApplyMigrationQuery),
218    RollbackMigration(RollbackMigrationQuery),
219    ExplainMigration(ExplainMigrationQuery),
220}
221
222#[derive(Debug, Clone)]
223#[allow(clippy::large_enum_variant)]
224pub enum SqlAdminCommand {
225    SetConfig { key: String, value: Value },
226    ShowConfig { prefix: Option<String> },
227    SetSecret { key: String, value: Value },
228    DeleteSecret { key: String },
229    ShowSecrets { prefix: Option<String> },
230    SetTenant(Option<String>),
231    ShowTenant,
232    TransactionControl(TxnControl),
233    Maintenance(MaintenanceCommand),
234    Grant(GrantStmt),
235    Revoke(RevokeStmt),
236    AlterUser(AlterUserStmt),
237    IamPolicy(QueryExpr),
238}
239
240impl SqlStatement {
241    pub fn into_command(self) -> SqlCommand {
242        match self {
243            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
244            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
245            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
246            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
247            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
248            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
249                SqlCommand::ExplainAlter(query)
250            }
251            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
252                SqlCommand::CreateTable(query)
253            }
254            SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
255                SqlCommand::CreateCollection(query)
256            }
257            SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
258                SqlCommand::CreateVector(query)
259            }
260            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
261                SqlCommand::DropTable(query)
262            }
263            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
264                SqlCommand::DropGraph(query)
265            }
266            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
267                SqlCommand::DropVector(query)
268            }
269            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
270                SqlCommand::DropDocument(query)
271            }
272            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
273            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
274                SqlCommand::DropCollection(query)
275            }
276            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
277            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
278                SqlCommand::AlterTable(query)
279            }
280            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
281                SqlCommand::CreateIndex(query)
282            }
283            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
284                SqlCommand::DropIndex(query)
285            }
286            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
287                SqlCommand::CreateTimeSeries(query)
288            }
289            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
290                SqlCommand::DropTimeSeries(query)
291            }
292            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
293                SqlCommand::CreateQueue(query)
294            }
295            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
296                SqlCommand::AlterQueue(query)
297            }
298            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
299                SqlCommand::DropQueue(query)
300            }
301            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
302                SqlCommand::CreateTree(query)
303            }
304            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
305            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
306                SqlCommand::Probabilistic(command)
307            }
308            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
309                SqlCommand::SetConfig { key, value }
310            }
311            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
312                SqlCommand::ShowConfig { prefix }
313            }
314            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
315                SqlCommand::SetSecret { key, value }
316            }
317            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
318                SqlCommand::DeleteSecret { key }
319            }
320            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
321                SqlCommand::ShowSecrets { prefix }
322            }
323            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
324            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
325            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
326                SqlCommand::TransactionControl(ctl)
327            }
328            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
329            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
330            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
331            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
332                SqlCommand::CreateSequence(q)
333            }
334            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
335            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
336            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
337            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
338            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
339                SqlCommand::RefreshMaterializedView(q)
340            }
341            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
342            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
343            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
344            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
345            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
346                SqlCommand::CreateForeignTable(q)
347            }
348            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
349                SqlCommand::DropForeignTable(q)
350            }
351            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
352            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
353            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
354            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
355            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
356                SqlCommand::CreateMigration(q)
357            }
358            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
359                SqlCommand::ApplyMigration(q)
360            }
361            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
362                SqlCommand::RollbackMigration(q)
363            }
364            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
365                SqlCommand::ExplainMigration(q)
366            }
367        }
368    }
369
370    pub fn into_query_expr(self) -> QueryExpr {
371        self.into_command().into_query_expr()
372    }
373}
374
375impl FrontendStatement {
376    pub fn into_query_expr(self) -> QueryExpr {
377        match self {
378            FrontendStatement::Sql(statement) => statement.into_query_expr(),
379            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
380            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
381            FrontendStatement::Path(query) => QueryExpr::Path(query),
382            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
383            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
384            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
385            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
386            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
387            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
388            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
389            FrontendStatement::EventsBackfillStatus { collection } => {
390                QueryExpr::EventsBackfillStatus { collection }
391            }
392            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
393            FrontendStatement::ProbabilisticCommand(command) => {
394                QueryExpr::ProbabilisticCommand(command)
395            }
396            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
397            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
398        }
399    }
400}
401
402pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
403    let mut parser = Parser::new(input)?;
404    let statement = parser.parse_frontend_statement()?;
405    if !parser.check(&Token::Eof) {
406        return Err(ParseError::new(
407            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
408            // Display arms emit raw user bytes. Render via `{:?}` so
409            // embedded CR/LF/NUL/quotes are escaped before the message
410            // reaches downstream JSON / audit / log / gRPC sinks.
411            format!("Unexpected token after query: {:?}", parser.current.token),
412            parser.position(),
413        ));
414    }
415    Ok(statement)
416}
417
418impl SqlCommand {
419    pub fn into_query_expr(self) -> QueryExpr {
420        match self {
421            SqlCommand::Select(query) => QueryExpr::Table(query),
422            SqlCommand::Join(query) => QueryExpr::Join(query),
423            SqlCommand::Insert(query) => QueryExpr::Insert(query),
424            SqlCommand::Update(query) => QueryExpr::Update(query),
425            SqlCommand::Delete(query) => QueryExpr::Delete(query),
426            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
427            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
428            SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
429            SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
430            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
431            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
432            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
433            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
434            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
435            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
436            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
437            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
438            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
439            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
440            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
441            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
442            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
443            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
444            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
445            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
446            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
447            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
448            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
449            SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
450            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
451            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
452            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
453            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
454            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
455            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
456            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
457            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
458            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
459            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
460            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
461            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
462            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
463            SqlCommand::DropView(q) => QueryExpr::DropView(q),
464            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
465            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
466            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
467            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
468            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
469            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
470            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
471            SqlCommand::Grant(s) => QueryExpr::Grant(s),
472            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
473            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
474            SqlCommand::IamPolicy(e) => e,
475            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
476            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
477            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
478            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
479        }
480    }
481
482    pub fn into_statement(self) -> SqlStatement {
483        match self {
484            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
485            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
486            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
487            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
488            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
489            SqlCommand::ExplainAlter(query) => {
490                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
491            }
492            SqlCommand::CreateTable(query) => {
493                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
494            }
495            SqlCommand::CreateCollection(query) => {
496                SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
497            }
498            SqlCommand::CreateVector(query) => {
499                SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
500            }
501            SqlCommand::DropTable(query) => {
502                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
503            }
504            SqlCommand::DropGraph(query) => {
505                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
506            }
507            SqlCommand::DropVector(query) => {
508                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
509            }
510            SqlCommand::DropDocument(query) => {
511                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
512            }
513            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
514            SqlCommand::DropCollection(query) => {
515                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
516            }
517            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
518            SqlCommand::AlterTable(query) => {
519                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
520            }
521            SqlCommand::CreateIndex(query) => {
522                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
523            }
524            SqlCommand::DropIndex(query) => {
525                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
526            }
527            SqlCommand::CreateTimeSeries(query) => {
528                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
529            }
530            SqlCommand::DropTimeSeries(query) => {
531                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
532            }
533            SqlCommand::CreateQueue(query) => {
534                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
535            }
536            SqlCommand::AlterQueue(query) => {
537                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
538            }
539            SqlCommand::DropQueue(query) => {
540                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
541            }
542            SqlCommand::CreateTree(query) => {
543                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
544            }
545            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
546            SqlCommand::Probabilistic(command) => {
547                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
548            }
549            SqlCommand::SetConfig { key, value } => {
550                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
551            }
552            SqlCommand::ShowConfig { prefix } => {
553                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
554            }
555            SqlCommand::SetSecret { key, value } => {
556                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
557            }
558            SqlCommand::DeleteSecret { key } => {
559                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
560            }
561            SqlCommand::ShowSecrets { prefix } => {
562                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
563            }
564            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
565            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
566            SqlCommand::TransactionControl(ctl) => {
567                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
568            }
569            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
570            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
571            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
572            SqlCommand::CreateSequence(q) => {
573                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
574            }
575            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
576            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
577            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
578            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
579            SqlCommand::RefreshMaterializedView(q) => {
580                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
581            }
582            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
583            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
584            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
585            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
586            SqlCommand::CreateForeignTable(q) => {
587                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
588            }
589            SqlCommand::DropForeignTable(q) => {
590                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
591            }
592            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
593            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
594            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
595            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
596            SqlCommand::CreateMigration(q) => {
597                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
598            }
599            SqlCommand::ApplyMigration(q) => {
600                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
601            }
602            SqlCommand::RollbackMigration(q) => {
603                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
604            }
605            SqlCommand::ExplainMigration(q) => {
606                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
607            }
608        }
609    }
610}
611
612impl<'a> Parser<'a> {
613    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
614        self.expect_ident()?; // EVENTS
615        if self.consume_ident_ci("STATUS")? {
616            let mut query = TableQuery::new("red.subscriptions");
617            let collection = match self.peek().clone() {
618                Token::Ident(name) => {
619                    self.advance()?;
620                    Some(name)
621                }
622                Token::String(name) => {
623                    self.advance()?;
624                    Some(name)
625                }
626                _ => None,
627            };
628            self.parse_table_clauses(&mut query)?;
629            if let Some(collection) = collection {
630                let filter = Filter::compare(
631                    FieldRef::column("red.subscriptions", "collection"),
632                    CompareOp::Eq,
633                    Value::text(collection),
634                );
635                let expr = filter_to_expr(&filter);
636                query.where_expr = Some(match query.where_expr.take() {
637                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
638                    None => expr,
639                });
640                query.filter = Some(match query.filter.take() {
641                    Some(existing) => existing.and(filter),
642                    None => filter,
643                });
644            }
645            return Ok(QueryExpr::Table(query));
646        }
647
648        if !self.consume_ident_ci("BACKFILL")? {
649            return Err(ParseError::expected(
650                vec!["BACKFILL", "STATUS"],
651                self.peek(),
652                self.position(),
653            ));
654        }
655
656        if self.consume_ident_ci("STATUS")? {
657            let collection = self.expect_ident()?;
658            return Ok(QueryExpr::EventsBackfillStatus { collection });
659        }
660
661        let collection = self.expect_ident()?;
662        let where_filter = if self.consume(&Token::Where)? {
663            let mut parts = Vec::new();
664            while !self.check(&Token::Eof) && !self.check(&Token::To) {
665                parts.push(self.peek().to_string());
666                self.advance()?;
667            }
668            if parts.is_empty() {
669                return Err(ParseError::expected(
670                    vec!["predicate"],
671                    self.peek(),
672                    self.position(),
673                ));
674            }
675            Some(parts.join(" "))
676        } else {
677            None
678        };
679
680        self.expect(Token::To)?;
681        let target_queue = self.expect_ident()?;
682        let limit = if self.consume(&Token::Limit)? {
683            Some(self.parse_positive_integer("LIMIT")? as u64)
684        } else {
685            None
686        };
687
688        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
689            collection,
690            where_filter,
691            target_queue,
692            limit,
693        }))
694    }
695
696    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
697    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
698    /// clause is absent. Values are always single-quoted string literals —
699    /// consistent with PG's generic-options model.
700    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
701        if !self.consume(&Token::Options)? {
702            return Ok(Vec::new());
703        }
704        self.expect(Token::LParen)?;
705        let mut out: Vec<(String, String)> = Vec::new();
706        loop {
707            // Option keys frequently collide with reserved words
708            // (`path`, `format`, `delimiter`, `header`, …) — accept
709            // the keyword form and lowercase it so downstream
710            // option-name matching stays case-insensitive.
711            let was_ident = matches!(self.peek(), Token::Ident(_));
712            let raw = self.expect_ident_or_keyword()?;
713            let key = if was_ident {
714                raw
715            } else {
716                raw.to_ascii_lowercase()
717            };
718            // Value is a single-quoted string literal.
719            let value = self.parse_string()?;
720            out.push((key, value));
721            if !self.consume(&Token::Comma)? {
722                break;
723            }
724        }
725        self.expect(Token::RParen)?;
726        Ok(out)
727    }
728
729    /// Parse any top-level frontend statement through a single shared surface.
730    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
731        match self.peek() {
732            Token::Select => match self.parse_select_query()? {
733                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
734                    SqlQuery::Select(query),
735                ))),
736                QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
737                    SqlQuery::Join(query),
738                ))),
739                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
740                other => Err(ParseError::new(
741                    format!("internal: SELECT produced unexpected query kind {other:?}"),
742                    self.position(),
743                )),
744            },
745            Token::From
746            | Token::Insert
747            | Token::Update
748            | Token::Truncate
749            | Token::Create
750            | Token::Drop
751            | Token::Alter
752            | Token::Set
753            | Token::Begin
754            | Token::Commit
755            | Token::Rollback
756            | Token::Savepoint
757            | Token::Release
758            | Token::Start
759            | Token::Vacuum
760            | Token::Analyze
761            | Token::Copy
762            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
763            Token::Explain => {
764                if matches!(
765                    self.peek_next()?,
766                    Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
767                ) {
768                    match self.parse_explain_ask_query()? {
769                        QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
770                        other => Err(ParseError::new(
771                            format!(
772                                "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
773                            ),
774                            self.position(),
775                        )),
776                    }
777                } else {
778                    self.parse_sql_statement().map(FrontendStatement::Sql)
779                }
780            }
781            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
782                self.parse_sql_statement().map(FrontendStatement::Sql)
783            }
784            Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
785            Token::Ident(name)
786                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
787            {
788                self.parse_sql_statement().map(FrontendStatement::Sql)
789            }
790            Token::Ident(name)
791                if name.eq_ignore_ascii_case("GRANT")
792                    || name.eq_ignore_ascii_case("REVOKE")
793                    || name.eq_ignore_ascii_case("SIMULATE")
794                    || name.eq_ignore_ascii_case("APPLY") =>
795            {
796                self.parse_sql_statement().map(FrontendStatement::Sql)
797            }
798            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
799                self.advance()?;
800                if matches!(
801                    self.peek(),
802                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
803                ) {
804                    match self.parse_config_watch_after_watch()? {
805                        QueryExpr::ConfigCommand(command) => {
806                            Ok(FrontendStatement::ConfigCommand(command))
807                        }
808                        other => Err(ParseError::new(
809                            format!(
810                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
811                            ),
812                            self.position(),
813                        )),
814                    }
815                } else if matches!(
816                    self.peek(),
817                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
818                ) {
819                    match self.parse_vault_watch_after_watch()? {
820                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
821                        other => Err(ParseError::new(
822                            format!(
823                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
824                            ),
825                            self.position(),
826                        )),
827                    }
828                } else {
829                    match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
830                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
831                        other => Err(ParseError::new(
832                            format!("internal: WATCH produced unexpected query kind {other:?}"),
833                            self.position(),
834                        )),
835                    }
836                }
837            }
838            Token::List => {
839                self.advance()?;
840                if matches!(
841                    self.peek(),
842                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
843                ) {
844                    match self.parse_config_list_after_list()? {
845                        QueryExpr::ConfigCommand(command) => {
846                            Ok(FrontendStatement::ConfigCommand(command))
847                        }
848                        other => Err(ParseError::new(
849                            format!(
850                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
851                            ),
852                            self.position(),
853                        )),
854                    }
855                } else if matches!(
856                    self.peek(),
857                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
858                ) {
859                    match self.parse_vault_list_after_list()? {
860                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
861                        other => Err(ParseError::new(
862                            format!(
863                                "internal: LIST VAULT produced unexpected query kind {other:?}"
864                            ),
865                            self.position(),
866                        )),
867                    }
868                } else {
869                    Err(ParseError::expected(
870                        vec!["CONFIG", "VAULT"],
871                        self.peek(),
872                        self.position(),
873                    ))
874                }
875            }
876            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
877                self.advance()?;
878                if matches!(
879                    self.peek(),
880                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
881                ) {
882                    match self.parse_config_list_after_list()? {
883                        QueryExpr::ConfigCommand(command) => {
884                            Ok(FrontendStatement::ConfigCommand(command))
885                        }
886                        other => Err(ParseError::new(
887                            format!(
888                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
889                            ),
890                            self.position(),
891                        )),
892                    }
893                } else if matches!(
894                    self.peek(),
895                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
896                ) {
897                    match self.parse_vault_list_after_list()? {
898                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
899                        other => Err(ParseError::new(
900                            format!(
901                                "internal: LIST VAULT produced unexpected query kind {other:?}"
902                            ),
903                            self.position(),
904                        )),
905                    }
906                } else {
907                    Err(ParseError::expected(
908                        vec!["CONFIG", "VAULT"],
909                        self.peek(),
910                        self.position(),
911                    ))
912                }
913            }
914            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
915                if matches!(
916                    self.peek_next()?,
917                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
918                ) {
919                    match self.parse_config_command()? {
920                        QueryExpr::ConfigCommand(command) => {
921                            Ok(FrontendStatement::ConfigCommand(command))
922                        }
923                        other => Err(ParseError::new(
924                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
925                            self.position(),
926                        )),
927                    }
928                } else {
929                    self.advance()?;
930                    match self.parse_kv_invalidate_tags_after_invalidate()? {
931                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
932                        other => Err(ParseError::new(
933                            format!(
934                                "internal: INVALIDATE produced unexpected query kind {other:?}"
935                            ),
936                            self.position(),
937                        )),
938                    }
939                }
940            }
941            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
942            Token::Match => match self.parse_match_query()? {
943                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
944                other => Err(ParseError::new(
945                    format!("internal: MATCH produced unexpected query kind {other:?}"),
946                    self.position(),
947                )),
948            },
949            Token::Path => match self.parse_path_query()? {
950                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
951                other => Err(ParseError::new(
952                    format!("internal: PATH produced unexpected query kind {other:?}"),
953                    self.position(),
954                )),
955            },
956            Token::Vector => match self.parse_vector_query()? {
957                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
958                other => Err(ParseError::new(
959                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
960                    self.position(),
961                )),
962            },
963            Token::Hybrid => match self.parse_hybrid_query()? {
964                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
965                other => Err(ParseError::new(
966                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
967                    self.position(),
968                )),
969            },
970            Token::Graph => match self.parse_graph_command()? {
971                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
972                other => Err(ParseError::new(
973                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
974                    self.position(),
975                )),
976            },
977            Token::Search => match self.parse_search_command()? {
978                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
979                other => Err(ParseError::new(
980                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
981                    self.position(),
982                )),
983            },
984            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
985                match self.parse_ask_query()? {
986                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
987                    other => Err(ParseError::new(
988                        format!("internal: ASK produced unexpected query kind {other:?}"),
989                        self.position(),
990                    )),
991                }
992            }
993            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
994                match self.parse_unseal_vault_command()? {
995                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
996                    other => Err(ParseError::new(
997                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
998                        self.position(),
999                    )),
1000                }
1001            }
1002            Token::Queue => match self.parse_queue_command()? {
1003                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
1004                other => Err(ParseError::new(
1005                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
1006                    self.position(),
1007                )),
1008            },
1009            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1010                match self.parse_events_command()? {
1011                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1012                        SqlQuery::Select(query),
1013                    ))),
1014                    QueryExpr::EventsBackfill(query) => {
1015                        Ok(FrontendStatement::EventsBackfill(query))
1016                    }
1017                    QueryExpr::EventsBackfillStatus { collection } => {
1018                        Ok(FrontendStatement::EventsBackfillStatus { collection })
1019                    }
1020                    other => Err(ParseError::new(
1021                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
1022                        self.position(),
1023                    )),
1024                }
1025            }
1026            Token::Kv => match self.parse_kv_command()? {
1027                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1028                other => Err(ParseError::new(
1029                    format!("internal: KV produced unexpected query kind {other:?}"),
1030                    self.position(),
1031                )),
1032            },
1033            Token::Delete => {
1034                if matches!(
1035                    self.peek_next()?,
1036                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1037                ) {
1038                    match self.parse_config_command()? {
1039                        QueryExpr::ConfigCommand(command) => {
1040                            Ok(FrontendStatement::ConfigCommand(command))
1041                        }
1042                        other => Err(ParseError::new(
1043                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1044                            self.position(),
1045                        )),
1046                    }
1047                } else if matches!(
1048                    self.peek_next()?,
1049                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1050                ) {
1051                    match self.parse_vault_lifecycle_command()? {
1052                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1053                        other => Err(ParseError::new(
1054                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1055                            self.position(),
1056                        )),
1057                    }
1058                } else {
1059                    self.parse_sql_statement().map(FrontendStatement::Sql)
1060                }
1061            }
1062            Token::Add => match self.parse_config_command()? {
1063                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1064                other => Err(ParseError::new(
1065                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
1066                    self.position(),
1067                )),
1068            },
1069            Token::Purge => match self.parse_vault_lifecycle_command()? {
1070                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1071                other => Err(ParseError::new(
1072                    format!("internal: VAULT produced unexpected query kind {other:?}"),
1073                    self.position(),
1074                )),
1075            },
1076            Token::Ident(name)
1077                if name.eq_ignore_ascii_case("PUT")
1078                    || name.eq_ignore_ascii_case("GET")
1079                    || name.eq_ignore_ascii_case("RESOLVE")
1080                    || name.eq_ignore_ascii_case("ROTATE")
1081                    || name.eq_ignore_ascii_case("HISTORY")
1082                    || name.eq_ignore_ascii_case("PURGE")
1083                    || name.eq_ignore_ascii_case("INCR")
1084                    || name.eq_ignore_ascii_case("DECR")
1085                    || name.eq_ignore_ascii_case("INVALIDATE") =>
1086            {
1087                if matches!(
1088                    self.peek_next()?,
1089                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1090                ) {
1091                    match self.parse_vault_lifecycle_command()? {
1092                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1093                        other => Err(ParseError::new(
1094                            format!("internal: VAULT produced unexpected query kind {other:?}"),
1095                            self.position(),
1096                        )),
1097                    }
1098                } else {
1099                    match self.parse_config_command()? {
1100                        QueryExpr::ConfigCommand(command) => {
1101                            Ok(FrontendStatement::ConfigCommand(command))
1102                        }
1103                        other => Err(ParseError::new(
1104                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
1105                            self.position(),
1106                        )),
1107                    }
1108                }
1109            }
1110            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1111                match self.parse_vault_command()? {
1112                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1113                    other => Err(ParseError::new(
1114                        format!("internal: VAULT produced unexpected query kind {other:?}"),
1115                        self.position(),
1116                    )),
1117                }
1118            }
1119            Token::Tree => match self.parse_tree_command()? {
1120                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1121                other => Err(ParseError::new(
1122                    format!("internal: TREE produced unexpected query kind {other:?}"),
1123                    self.position(),
1124                )),
1125            },
1126            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1127                match self.parse_hll_command()? {
1128                    QueryExpr::ProbabilisticCommand(command) => {
1129                        Ok(FrontendStatement::ProbabilisticCommand(command))
1130                    }
1131                    other => Err(ParseError::new(
1132                        format!("internal: HLL produced unexpected query kind {other:?}"),
1133                        self.position(),
1134                    )),
1135                }
1136            }
1137            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1138                match self.parse_sketch_command()? {
1139                    QueryExpr::ProbabilisticCommand(command) => {
1140                        Ok(FrontendStatement::ProbabilisticCommand(command))
1141                    }
1142                    other => Err(ParseError::new(
1143                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
1144                        self.position(),
1145                    )),
1146                }
1147            }
1148            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1149                match self.parse_filter_command()? {
1150                    QueryExpr::ProbabilisticCommand(command) => {
1151                        Ok(FrontendStatement::ProbabilisticCommand(command))
1152                    }
1153                    other => Err(ParseError::new(
1154                        format!("internal: FILTER produced unexpected query kind {other:?}"),
1155                        self.position(),
1156                    )),
1157                }
1158            }
1159            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1160                .parse_sql_command()
1161                .map(SqlCommand::into_statement)
1162                .map(FrontendStatement::Sql),
1163            other => Err(ParseError::expected(
1164                vec![
1165                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1166                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1167                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1168                    "DESCRIBE", "DESC",
1169                ],
1170                other,
1171                self.position(),
1172            )),
1173        }
1174    }
1175
1176    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
1177    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1178        self.parse_sql_command().map(SqlCommand::into_statement)
1179    }
1180
1181    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1182        let mut path = self.expect_ident()?;
1183        while self.consume(&Token::Dot)? {
1184            let next = self.expect_ident_or_keyword()?;
1185            path = format!("{path}.{next}");
1186        }
1187        Ok(if lowercase {
1188            path.to_ascii_lowercase()
1189        } else {
1190            path
1191        })
1192    }
1193
1194    /// Parse any SQL/RQL-style command through a single frontend module.
1195    /// Parse a `CREATE ...` statement. Split out of
1196    /// [`parse_sql_command`] so its very large per-arm locals
1197    /// (every CREATE variant's query struct) live in their own
1198    /// stack frame instead of inflating the dispatcher's frame.
1199    /// `parse_sql_command` recurses (CREATE VIEW ... AS <stmt>,
1200    /// nested subqueries), so a fat dispatcher frame stacked on
1201    /// itself overflowed small (2 MiB) worker-thread stacks (#635).
1202    #[inline(never)]
1203    fn parse_create_command(&mut self) -> Result<SqlCommand, ParseError> {
1204        let pos = self.position();
1205        self.advance()?;
1206
1207        // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
1208        // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
1209        // don't collide with other CREATE variants (TABLE, INDEX, etc.).
1210        let mut or_replace = false;
1211        if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1212            let _ = self.consume_ident_ci("REPLACE")?;
1213            or_replace = true;
1214        }
1215        let materialized = self.consume(&Token::Materialized)?;
1216        if self.check(&Token::View) {
1217            self.advance()?;
1218            let if_not_exists = self.match_if_not_exists()?;
1219            let name = self.expect_ident()?;
1220            // Issue #584 slice 12 — `WITH RETENTION <duration>`
1221            // on CREATE MATERIALIZED VIEW. Parsed before `AS`
1222            // so the SELECT body parser cannot consume the
1223            // trailing `WITH` for its own (TTL / METADATA /
1224            // …) clauses. Persisted on the view definition;
1225            // the physical sweep against view-backing rows
1226            // activates with the slice-9 row-storage follow-up.
1227            let mut retention_duration_ms: Option<u64> = None;
1228            if self.check(&Token::With) {
1229                self.advance()?;
1230                if !self.consume(&Token::Retention)? && !self.consume_ident_ci("RETENTION")? {
1231                    return Err(ParseError::expected(
1232                        vec!["RETENTION"],
1233                        self.peek(),
1234                        self.position(),
1235                    ));
1236                }
1237                if !materialized {
1238                    return Err(ParseError::new(
1239                        "WITH RETENTION is only valid on \
1240                                 CREATE MATERIALIZED VIEW"
1241                            .to_string(),
1242                        self.position(),
1243                    ));
1244                }
1245                let value = self.parse_float()?;
1246                let unit_mult = self.parse_duration_unit()?;
1247                retention_duration_ms = Some((value * unit_mult).round() as u64);
1248            }
1249            // Accept `AS` — the lexer promotes it to `Token::As`
1250            // (keyword) but some paths still see it as an ident.
1251            if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1252                return Err(ParseError::expected(
1253                    vec!["AS"],
1254                    self.peek(),
1255                    self.position(),
1256                ));
1257            }
1258            // Recursive parse of the body. Any QueryExpr that the
1259            // rest of the grammar accepts is valid (Select, Join, etc.).
1260            let body = self.parse_sql_command()?.into_query_expr();
1261            // Optional `REFRESH EVERY <duration>` clause on
1262            // materialized views (issue #583 slice 10). The
1263            // background scheduler reads this off the view
1264            // descriptor and ticks the view on its cadence.
1265            let mut refresh_every_ms: Option<u64> = None;
1266            if self.check(&Token::Refresh) {
1267                if !materialized {
1268                    return Err(ParseError::new(
1269                        "REFRESH EVERY is only valid on \
1270                                 CREATE MATERIALIZED VIEW"
1271                            .to_string(),
1272                        self.position(),
1273                    ));
1274                }
1275                self.advance()?;
1276                if !self.consume_ident_ci("EVERY")? {
1277                    return Err(ParseError::expected(
1278                        vec!["EVERY"],
1279                        self.peek(),
1280                        self.position(),
1281                    ));
1282                }
1283                let value = self.parse_float()?;
1284                let unit_mult = self.parse_duration_unit()?;
1285                refresh_every_ms = Some((value * unit_mult).round() as u64);
1286            }
1287            return Ok(SqlCommand::CreateView(CreateViewQuery {
1288                name,
1289                query: Box::new(body),
1290                materialized,
1291                if_not_exists,
1292                or_replace,
1293                refresh_every_ms,
1294                retention_duration_ms,
1295            }));
1296        }
1297        // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
1298        // bail out — no other CREATE form accepts those modifiers.
1299        if or_replace || materialized {
1300            return Err(ParseError::expected(
1301                vec!["VIEW"],
1302                self.peek(),
1303                self.position(),
1304            ));
1305        }
1306
1307        if self.check(&Token::Index) || self.check(&Token::Unique) {
1308            match self.parse_create_index_query()? {
1309                QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1310                other => Err(ParseError::new(
1311                    format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1312                    self.position(),
1313                )),
1314            }
1315        } else if self.check(&Token::Table) {
1316            self.expect(Token::Table)?;
1317            match self.parse_create_table_body()? {
1318                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1319                other => Err(ParseError::new(
1320                    format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1321                    self.position(),
1322                )),
1323            }
1324        } else if self.check(&Token::Graph) {
1325            self.advance()?;
1326            match self.parse_create_collection_model_body(CollectionModel::Graph)? {
1327                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1328                other => Err(ParseError::new(
1329                    format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
1330                    self.position(),
1331                )),
1332            }
1333        } else if self.check(&Token::Document) {
1334            self.advance()?;
1335            match self.parse_create_collection_model_body(CollectionModel::Document)? {
1336                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1337                other => Err(ParseError::new(
1338                    format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
1339                    self.position(),
1340                )),
1341            }
1342        } else if self.check(&Token::Vector) {
1343            self.advance()?;
1344            match self.parse_create_vector_body()? {
1345                QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
1346                other => Err(ParseError::new(
1347                    format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
1348                    self.position(),
1349                )),
1350            }
1351        } else if self.check(&Token::Collection) {
1352            self.advance()?;
1353            match self.parse_create_collection_body()? {
1354                QueryExpr::CreateCollection(query) => Ok(SqlCommand::CreateCollection(query)),
1355                other => Err(ParseError::new(
1356                    format!("internal: CREATE COLLECTION produced unexpected kind {other:?}"),
1357                    self.position(),
1358                )),
1359            }
1360        } else if self.check(&Token::Kv) {
1361            self.advance()?;
1362            match self.parse_create_keyed_body(CollectionModel::Kv)? {
1363                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1364                other => Err(ParseError::new(
1365                    format!("internal: CREATE KV produced unexpected kind {other:?}"),
1366                    self.position(),
1367                )),
1368            }
1369        } else if self.consume_ident_ci("CONFIG")? {
1370            match self.parse_create_keyed_body(CollectionModel::Config)? {
1371                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1372                other => Err(ParseError::new(
1373                    format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1374                    self.position(),
1375                )),
1376            }
1377        } else if self.consume_ident_ci("VAULT")? {
1378            match self.parse_create_keyed_body(CollectionModel::Vault)? {
1379                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1380                other => Err(ParseError::new(
1381                    format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1382                    self.position(),
1383                )),
1384            }
1385        } else if self.check(&Token::Timeseries) {
1386            self.advance()?;
1387            match self.parse_create_timeseries_body()? {
1388                QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
1389                other => Err(ParseError::new(
1390                    format!("internal: CREATE TIMESERIES produced unexpected kind {other:?}"),
1391                    self.position(),
1392                )),
1393            }
1394        } else if self.consume_ident_ci("METRICS")? {
1395            match self.parse_create_metrics_body()? {
1396                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1397                other => Err(ParseError::new(
1398                    format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
1399                    self.position(),
1400                )),
1401            }
1402        } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE")) {
1403            self.advance()?;
1404            match self.parse_create_hypertable_body()? {
1405                QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
1406                other => Err(ParseError::new(
1407                    format!("internal: CREATE HYPERTABLE produced unexpected kind {other:?}"),
1408                    self.position(),
1409                )),
1410            }
1411        } else if self.check(&Token::Queue) {
1412            self.advance()?;
1413            match self.parse_create_queue_body()? {
1414                QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1415                other => Err(ParseError::new(
1416                    format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1417                    self.position(),
1418                )),
1419            }
1420        } else if self.check(&Token::Tree) {
1421            self.advance()?;
1422            match self.parse_create_tree_body()? {
1423                QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1424                other => Err(ParseError::new(
1425                    format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1426                    self.position(),
1427                )),
1428            }
1429        } else if matches!(self.peek(), Token::Ident(n) if
1430                    n.eq_ignore_ascii_case("HLL") ||
1431                    n.eq_ignore_ascii_case("SKETCH") ||
1432                    n.eq_ignore_ascii_case("FILTER"))
1433        {
1434            match self.parse_create_probabilistic()? {
1435                QueryExpr::ProbabilisticCommand(command) => Ok(SqlCommand::Probabilistic(command)),
1436                other => Err(ParseError::new(
1437                    format!("internal: CREATE probabilistic produced unexpected kind {other:?}"),
1438                    self.position(),
1439                )),
1440            }
1441        } else if self.check(&Token::Schema) {
1442            // CREATE SCHEMA [IF NOT EXISTS] name
1443            self.advance()?;
1444            let if_not_exists = self.match_if_not_exists()?;
1445            let name = self.expect_ident()?;
1446            Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1447                name,
1448                if_not_exists,
1449            }))
1450        } else if self.check(&Token::Policy) {
1451            // Two forms share the leading `CREATE POLICY` tokens:
1452            //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
1453            //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
1454            // Disambiguate by peeking the token after POLICY.
1455            self.advance()?;
1456            if matches!(self.peek(), Token::String(_)) {
1457                // IAM form — short-circuit out of the SQL command stack.
1458                let expr = self.parse_create_iam_policy_after_keywords()?;
1459                // Inline command-wrapping: produce a synthetic SqlCommand by
1460                // routing through a generic IAM admin holder. We don't
1461                // have a dedicated SqlCommand variant for IAM yet, so we
1462                // bounce through the existing Grant-shaped Admin slot
1463                // which expects no further tokens.
1464                return Ok(SqlCommand::IamPolicy(expr));
1465            }
1466            let name = self.expect_ident()?;
1467            self.expect(Token::On)?;
1468
1469            let (target_kind, table) = {
1470                use crate::storage::query::ast::PolicyTargetKind;
1471                let kw = match self.peek() {
1472                    Token::Ident(s) => Some(s.to_ascii_uppercase()),
1473                    _ => None,
1474                };
1475                let kind = kw.as_deref().and_then(|k| match k {
1476                    "NODES" => Some(PolicyTargetKind::Nodes),
1477                    "EDGES" => Some(PolicyTargetKind::Edges),
1478                    "VECTORS" => Some(PolicyTargetKind::Vectors),
1479                    "MESSAGES" => Some(PolicyTargetKind::Messages),
1480                    "POINTS" => Some(PolicyTargetKind::Points),
1481                    "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1482                    _ => None,
1483                });
1484                if let Some(k) = kind {
1485                    self.advance()?;
1486                    self.expect(Token::Of)?;
1487                    let coll = self.expect_ident()?;
1488                    (k, coll)
1489                } else {
1490                    let coll = self.expect_ident()?;
1491                    (PolicyTargetKind::Table, coll)
1492                }
1493            };
1494
1495            let action = if self.consume(&Token::For)? {
1496                let a = match self.peek() {
1497                    Token::Select => {
1498                        self.advance()?;
1499                        Some(PolicyAction::Select)
1500                    }
1501                    Token::Insert => {
1502                        self.advance()?;
1503                        Some(PolicyAction::Insert)
1504                    }
1505                    Token::Update => {
1506                        self.advance()?;
1507                        Some(PolicyAction::Update)
1508                    }
1509                    Token::Delete => {
1510                        self.advance()?;
1511                        Some(PolicyAction::Delete)
1512                    }
1513                    Token::All => {
1514                        self.advance()?;
1515                        None
1516                    }
1517                    _ => None,
1518                };
1519                a
1520            } else {
1521                None
1522            };
1523
1524            let role = if self.consume(&Token::To)? {
1525                Some(self.expect_ident()?)
1526            } else {
1527                None
1528            };
1529
1530            self.expect(Token::Using)?;
1531            self.expect(Token::LParen)?;
1532            let filter = self.parse_filter()?;
1533            self.expect(Token::RParen)?;
1534
1535            Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1536                name,
1537                table,
1538                action,
1539                role,
1540                using: Box::new(filter),
1541                target_kind,
1542            }))
1543        } else if self.check(&Token::Server) {
1544            // CREATE SERVER [IF NOT EXISTS] name
1545            //   FOREIGN DATA WRAPPER kind
1546            //   [OPTIONS (key 'value', ...)]
1547            self.advance()?;
1548            let if_not_exists = self.match_if_not_exists()?;
1549            let name = self.expect_ident()?;
1550            self.expect(Token::Foreign)?;
1551            self.expect(Token::Data)?;
1552            self.expect(Token::Wrapper)?;
1553            let wrapper = self.expect_ident()?;
1554            let options = self.parse_fdw_options_clause()?;
1555            Ok(SqlCommand::CreateServer(CreateServerQuery {
1556                name,
1557                wrapper,
1558                options,
1559                if_not_exists,
1560            }))
1561        } else if self.check(&Token::Foreign) {
1562            // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
1563            //   SERVER server_name
1564            //   [OPTIONS (key 'value', ...)]
1565            self.advance()?;
1566            self.expect(Token::Table)?;
1567            let if_not_exists = self.match_if_not_exists()?;
1568            let name = self.expect_ident()?;
1569            self.expect(Token::LParen)?;
1570            let mut columns = Vec::new();
1571            loop {
1572                let col_name = self.expect_ident()?;
1573                let data_type = self.expect_ident_or_keyword()?;
1574                // Inline NOT NULL check — the CREATE TABLE path's helper is
1575                // private and coupling to it just for FDW columns isn't worth it.
1576                let mut not_null = false;
1577                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1578                    self.advance()?;
1579                    if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL")) {
1580                        self.advance()?;
1581                        not_null = true;
1582                    }
1583                }
1584                columns.push(ForeignColumnDef {
1585                    name: col_name,
1586                    data_type,
1587                    not_null,
1588                });
1589                if !self.consume(&Token::Comma)? {
1590                    break;
1591                }
1592            }
1593            self.expect(Token::RParen)?;
1594            self.expect(Token::Server)?;
1595            let server = self.expect_ident()?;
1596            let options = self.parse_fdw_options_clause()?;
1597            Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1598                name,
1599                server,
1600                columns,
1601                options,
1602                if_not_exists,
1603            }))
1604        } else if self.check(&Token::Sequence) {
1605            // CREATE SEQUENCE [IF NOT EXISTS] name
1606            //   [START [WITH] n] [INCREMENT [BY] n]
1607            self.advance()?;
1608            let if_not_exists = self.match_if_not_exists()?;
1609            let name = self.expect_ident()?;
1610            let mut start: i64 = 1;
1611            let mut increment: i64 = 1;
1612            // Loop over optional clauses in any order.
1613            loop {
1614                if self.consume(&Token::Start)? {
1615                    // Accept `START 100` or `START WITH 100`.
1616                    let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1617                    start = self.parse_integer()?;
1618                } else if self.consume(&Token::Increment)? {
1619                    // Accept `INCREMENT 5` or `INCREMENT BY 5`.
1620                    let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1621                    increment = self.parse_integer()?;
1622                } else {
1623                    break;
1624                }
1625            }
1626            Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1627                name,
1628                if_not_exists,
1629                start,
1630                increment,
1631            }))
1632        } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
1633            self.advance()?; // consume MIGRATION
1634            match self.parse_create_migration_body()? {
1635                QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1636                other => Err(ParseError::new(
1637                    format!("internal: CREATE MIGRATION produced unexpected kind {other:?}"),
1638                    self.position(),
1639                )),
1640            }
1641        } else if let Some(err) =
1642            ParseError::unsupported_recognized_token(self.peek(), self.position())
1643        {
1644            Err(err)
1645        } else {
1646            Err(ParseError::expected(
1647                vec![
1648                    "TABLE",
1649                    "GRAPH",
1650                    "VECTOR",
1651                    "DOCUMENT",
1652                    "KV",
1653                    "COLLECTION",
1654                    "INDEX",
1655                    "UNIQUE",
1656                    "TIMESERIES",
1657                    "QUEUE",
1658                    "TREE",
1659                    "HLL",
1660                    "SKETCH",
1661                    "FILTER",
1662                    "SCHEMA",
1663                    "SEQUENCE",
1664                    "MIGRATION",
1665                ],
1666                self.peek(),
1667                pos,
1668            ))
1669        }
1670    }
1671
1672    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1673        match self.peek() {
1674            Token::Select => match self.parse_select_query()? {
1675                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1676                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1677                other => Err(ParseError::new(
1678                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1679                    self.position(),
1680                )),
1681            },
1682            Token::From => match self.parse_from_query()? {
1683                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1684                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1685                other => Err(ParseError::new(
1686                    format!("internal: FROM produced unexpected query kind {other:?}"),
1687                    self.position(),
1688                )),
1689            },
1690            Token::Insert => match self.parse_insert_query()? {
1691                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1692                other => Err(ParseError::new(
1693                    format!("internal: INSERT produced unexpected query kind {other:?}"),
1694                    self.position(),
1695                )),
1696            },
1697            Token::Update => match self.parse_update_query()? {
1698                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1699                other => Err(ParseError::new(
1700                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
1701                    self.position(),
1702                )),
1703            },
1704            Token::Delete => {
1705                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1706                {
1707                    self.advance()?; // DELETE
1708                    self.advance()?; // SECRET
1709                    let key = self.parse_dotted_admin_path(true)?;
1710                    Ok(SqlCommand::DeleteSecret { key })
1711                } else {
1712                    match self.parse_delete_query()? {
1713                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1714                        other => Err(ParseError::new(
1715                            format!("internal: DELETE produced unexpected query kind {other:?}"),
1716                            self.position(),
1717                        )),
1718                    }
1719                }
1720            }
1721            Token::Truncate => {
1722                self.advance()?;
1723                let model = if self.consume(&Token::Table)? {
1724                    Some(CollectionModel::Table)
1725                } else if self.consume(&Token::Graph)? {
1726                    Some(CollectionModel::Graph)
1727                } else if self.consume(&Token::Vector)? {
1728                    Some(CollectionModel::Vector)
1729                } else if self.consume(&Token::Document)? {
1730                    Some(CollectionModel::Document)
1731                } else if self.consume(&Token::Timeseries)? {
1732                    Some(CollectionModel::TimeSeries)
1733                } else if self.consume_ident_ci("METRICS")? {
1734                    Some(CollectionModel::Metrics)
1735                } else if self.consume(&Token::Kv)? {
1736                    Some(CollectionModel::Kv)
1737                } else if self.consume(&Token::Queue)? {
1738                    Some(CollectionModel::Queue)
1739                } else if self.consume(&Token::Collection)? {
1740                    None
1741                } else {
1742                    return Err(ParseError::expected(
1743                        vec![
1744                            "TABLE",
1745                            "GRAPH",
1746                            "VECTOR",
1747                            "DOCUMENT",
1748                            "TIMESERIES",
1749                            "METRICS",
1750                            "KV",
1751                            "QUEUE",
1752                            "COLLECTION",
1753                        ],
1754                        self.peek(),
1755                        self.position(),
1756                    ));
1757                };
1758                match self.parse_truncate_body(model)? {
1759                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1760                    other => Err(ParseError::new(
1761                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1762                        self.position(),
1763                    )),
1764                }
1765            }
1766            Token::Explain => {
1767                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
1768                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
1769                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
1770                {
1771                    self.advance()?; // consume EXPLAIN
1772                    match self.parse_explain_migration_after_keyword()? {
1773                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
1774                        other => Err(ParseError::new(
1775                            format!(
1776                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
1777                            ),
1778                            self.position(),
1779                        )),
1780                    }
1781                } else {
1782                    match self.parse_explain_alter_query()? {
1783                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
1784                        other => Err(ParseError::new(
1785                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
1786                            self.position(),
1787                        )),
1788                    }
1789                }
1790            }
1791            Token::Create => self.parse_create_command(),
1792            Token::Drop => {
1793                let pos = self.position();
1794                self.advance()?;
1795
1796                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
1797                let materialized = self.consume(&Token::Materialized)?;
1798                if self.check(&Token::View) {
1799                    self.advance()?;
1800                    let if_exists = self.match_if_exists()?;
1801                    let name = self.expect_ident()?;
1802                    return Ok(SqlCommand::DropView(DropViewQuery {
1803                        name,
1804                        materialized,
1805                        if_exists,
1806                    }));
1807                }
1808                if materialized {
1809                    return Err(ParseError::expected(
1810                        vec!["VIEW"],
1811                        self.peek(),
1812                        self.position(),
1813                    ));
1814                }
1815
1816                if self.check(&Token::Index) {
1817                    match self.parse_drop_index_query()? {
1818                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
1819                        other => Err(ParseError::new(
1820                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
1821                            self.position(),
1822                        )),
1823                    }
1824                } else if self.check(&Token::Table) {
1825                    self.expect(Token::Table)?;
1826                    match self.parse_drop_table_body()? {
1827                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
1828                        other => Err(ParseError::new(
1829                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
1830                            self.position(),
1831                        )),
1832                    }
1833                } else if self.check(&Token::Graph) {
1834                    self.advance()?;
1835                    match self.parse_drop_graph_body()? {
1836                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
1837                        other => Err(ParseError::new(
1838                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
1839                            self.position(),
1840                        )),
1841                    }
1842                } else if self.check(&Token::Vector) {
1843                    self.advance()?;
1844                    match self.parse_drop_vector_body()? {
1845                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
1846                        other => Err(ParseError::new(
1847                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
1848                            self.position(),
1849                        )),
1850                    }
1851                } else if self.check(&Token::Document) {
1852                    self.advance()?;
1853                    match self.parse_drop_document_body()? {
1854                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
1855                        other => Err(ParseError::new(
1856                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
1857                            self.position(),
1858                        )),
1859                    }
1860                } else if self.check(&Token::Kv) {
1861                    self.advance()?;
1862                    match self.parse_drop_kv_body()? {
1863                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1864                        other => Err(ParseError::new(
1865                            format!("internal: DROP KV produced unexpected kind {other:?}"),
1866                            self.position(),
1867                        )),
1868                    }
1869                } else if self.consume_ident_ci("CONFIG")? {
1870                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
1871                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1872                        other => Err(ParseError::new(
1873                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
1874                            self.position(),
1875                        )),
1876                    }
1877                } else if self.consume_ident_ci("VAULT")? {
1878                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
1879                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
1880                        other => Err(ParseError::new(
1881                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
1882                            self.position(),
1883                        )),
1884                    }
1885                } else if self.check(&Token::Collection) {
1886                    self.advance()?;
1887                    match self.parse_drop_collection_body()? {
1888                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1889                        other => Err(ParseError::new(
1890                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
1891                            self.position(),
1892                        )),
1893                    }
1894                } else if self.check(&Token::Timeseries) {
1895                    self.advance()?;
1896                    match self.parse_drop_timeseries_body()? {
1897                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1898                        other => Err(ParseError::new(
1899                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
1900                            self.position(),
1901                        )),
1902                    }
1903                } else if self.consume_ident_ci("METRICS")? {
1904                    match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
1905                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
1906                        other => Err(ParseError::new(
1907                            format!("internal: DROP METRICS produced unexpected kind {other:?}"),
1908                            self.position(),
1909                        )),
1910                    }
1911                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
1912                {
1913                    // DROP HYPERTABLE name reuses the same AST as
1914                    // DROP TIMESERIES — runtime clears the registry
1915                    // entry *and* drops the backing collection.
1916                    self.advance()?;
1917                    match self.parse_drop_timeseries_body()? {
1918                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
1919                        other => Err(ParseError::new(
1920                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
1921                            self.position(),
1922                        )),
1923                    }
1924                } else if self.check(&Token::Queue) {
1925                    self.advance()?;
1926                    match self.parse_drop_queue_body()? {
1927                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
1928                        other => Err(ParseError::new(
1929                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
1930                            self.position(),
1931                        )),
1932                    }
1933                } else if self.check(&Token::Tree) {
1934                    self.advance()?;
1935                    match self.parse_drop_tree_body()? {
1936                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
1937                        other => Err(ParseError::new(
1938                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
1939                            self.position(),
1940                        )),
1941                    }
1942                } else if matches!(self.peek(), Token::Ident(n) if
1943                    n.eq_ignore_ascii_case("HLL") ||
1944                    n.eq_ignore_ascii_case("SKETCH") ||
1945                    n.eq_ignore_ascii_case("FILTER"))
1946                {
1947                    match self.parse_drop_probabilistic()? {
1948                        QueryExpr::ProbabilisticCommand(command) => {
1949                            Ok(SqlCommand::Probabilistic(command))
1950                        }
1951                        other => Err(ParseError::new(
1952                            format!(
1953                                "internal: DROP probabilistic produced unexpected kind {other:?}"
1954                            ),
1955                            self.position(),
1956                        )),
1957                    }
1958                } else if self.check(&Token::Schema) {
1959                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
1960                    self.advance()?;
1961                    let if_exists = self.match_if_exists()?;
1962                    let name = self.expect_ident()?;
1963                    let cascade = self.consume(&Token::Cascade)?;
1964                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
1965                        name,
1966                        if_exists,
1967                        cascade,
1968                    }))
1969                } else if self.check(&Token::Policy) {
1970                    // Two forms:
1971                    //   * IAM:   DROP POLICY '<id>'
1972                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
1973                    self.advance()?;
1974                    if matches!(self.peek(), Token::String(_)) {
1975                        let expr = self.parse_drop_iam_policy_after_keywords()?;
1976                        return Ok(SqlCommand::IamPolicy(expr));
1977                    }
1978                    let if_exists = self.match_if_exists()?;
1979                    let name = self.expect_ident()?;
1980                    self.expect(Token::On)?;
1981                    let table = self.expect_ident()?;
1982                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
1983                        name,
1984                        table,
1985                        if_exists,
1986                    }))
1987                } else if self.check(&Token::Server) {
1988                    // DROP SERVER [IF EXISTS] name [CASCADE]
1989                    self.advance()?;
1990                    let if_exists = self.match_if_exists()?;
1991                    let name = self.expect_ident()?;
1992                    let cascade = self.consume(&Token::Cascade)?;
1993                    Ok(SqlCommand::DropServer(DropServerQuery {
1994                        name,
1995                        if_exists,
1996                        cascade,
1997                    }))
1998                } else if self.check(&Token::Foreign) {
1999                    // DROP FOREIGN TABLE [IF EXISTS] name
2000                    self.advance()?;
2001                    self.expect(Token::Table)?;
2002                    let if_exists = self.match_if_exists()?;
2003                    let name = self.expect_ident()?;
2004                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
2005                        name,
2006                        if_exists,
2007                    }))
2008                } else if self.check(&Token::Sequence) {
2009                    // DROP SEQUENCE [IF EXISTS] name
2010                    self.advance()?;
2011                    let if_exists = self.match_if_exists()?;
2012                    let name = self.expect_ident()?;
2013                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
2014                        name,
2015                        if_exists,
2016                    }))
2017                } else if let Some(err) =
2018                    ParseError::unsupported_recognized_token(self.peek(), self.position())
2019                {
2020                    Err(err)
2021                } else {
2022                    Err(ParseError::expected(
2023                        vec![
2024                            "TABLE",
2025                            "INDEX",
2026                            "TIMESERIES",
2027                            "QUEUE",
2028                            "TREE",
2029                            "HLL",
2030                            "SKETCH",
2031                            "FILTER",
2032                            "SCHEMA",
2033                            "SEQUENCE",
2034                        ],
2035                        self.peek(),
2036                        pos,
2037                    ))
2038                }
2039            }
2040            Token::Alter => {
2041                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
2042                // committing to a path until we've seen the target.
2043                // We peek the *next* token (without consuming) and
2044                // dispatch accordingly.
2045                let next = self.peek_next()?.clone();
2046                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
2047                    self.advance()?; // consume ALTER
2048                    let stmt = self.parse_alter_user_statement()?;
2049                    Ok(SqlCommand::AlterUser(stmt))
2050                } else if matches!(next, Token::Queue) {
2051                    self.advance()?; // consume ALTER
2052                    self.advance()?; // consume QUEUE
2053                    match self.parse_alter_queue_body()? {
2054                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
2055                        other => Err(ParseError::new(
2056                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
2057                            self.position(),
2058                        )),
2059                    }
2060                } else if matches!(next, Token::Table)
2061                    || matches!(next, Token::Collection)
2062                    || matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("COLLECTION"))
2063                {
2064                    // Issue #522 — `ALTER COLLECTION` shares the AlterTable
2065                    // AST so signer-registry mutations dispatch through the
2066                    // existing executor. The DDL parser body accepts either
2067                    // keyword interchangeably for the open-vocabulary alters
2068                    // we own (currently `ADD|REVOKE SIGNER`).
2069                    match self.parse_alter_table_query()? {
2070                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2071                        other => Err(ParseError::new(
2072                            format!(
2073                                "internal: ALTER TABLE produced unexpected query kind {other:?}"
2074                            ),
2075                            self.position(),
2076                        )),
2077                    }
2078                } else if let Some(err) =
2079                    ParseError::unsupported_recognized_token(&next, self.position())
2080                {
2081                    Err(err)
2082                } else {
2083                    match self.parse_alter_table_query()? {
2084                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2085                        other => Err(ParseError::new(
2086                            format!("internal: ALTER produced unexpected query kind {other:?}"),
2087                            self.position(),
2088                        )),
2089                    }
2090                }
2091            }
2092            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
2093                let stmt = self.parse_grant_statement()?;
2094                Ok(SqlCommand::Grant(stmt))
2095            }
2096            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
2097                let stmt = self.parse_revoke_statement()?;
2098                Ok(SqlCommand::Revoke(stmt))
2099            }
2100            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2101                self.advance()?;
2102                if self.consume_ident_ci("BACKFILL")? {
2103                    return Err(ParseError::new(
2104                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
2105                            .to_string(),
2106                        self.position(),
2107                    ));
2108                }
2109                if !self.consume_ident_ci("STATUS")? {
2110                    return Err(ParseError::expected(
2111                        vec!["STATUS"],
2112                        self.peek(),
2113                        self.position(),
2114                    ));
2115                }
2116
2117                let mut query = TableQuery::new("red.subscriptions");
2118                let collection = match self.peek().clone() {
2119                    Token::Ident(name) => {
2120                        self.advance()?;
2121                        Some(name)
2122                    }
2123                    Token::String(name) => {
2124                        self.advance()?;
2125                        Some(name)
2126                    }
2127                    _ => None,
2128                };
2129                self.parse_table_clauses(&mut query)?;
2130                if let Some(collection) = collection {
2131                    let filter = Filter::compare(
2132                        FieldRef::column("red.subscriptions", "collection"),
2133                        CompareOp::Eq,
2134                        Value::text(collection),
2135                    );
2136                    let expr = filter_to_expr(&filter);
2137                    query.where_expr = Some(match query.where_expr.take() {
2138                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
2139                        None => expr,
2140                    });
2141                    query.filter = Some(match query.filter.take() {
2142                        Some(existing) => existing.and(filter),
2143                        None => filter,
2144                    });
2145                }
2146                Ok(SqlCommand::Select(query))
2147            }
2148            Token::Attach => {
2149                let expr = self.parse_attach_policy()?;
2150                Ok(SqlCommand::IamPolicy(expr))
2151            }
2152            Token::Detach => {
2153                let expr = self.parse_detach_policy()?;
2154                Ok(SqlCommand::IamPolicy(expr))
2155            }
2156            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
2157                let expr = self.parse_simulate_policy()?;
2158                Ok(SqlCommand::IamPolicy(expr))
2159            }
2160            Token::Set => {
2161                self.advance()?;
2162                if self.consume_ident_ci("CONFIG")? {
2163                    let full_key = self.parse_dotted_admin_path(true)?;
2164                    self.expect(Token::Eq)?;
2165                    let value = self.parse_literal_value()?;
2166                    Ok(SqlCommand::SetConfig {
2167                        key: full_key,
2168                        value,
2169                    })
2170                } else if self.consume_ident_ci("SECRET")? {
2171                    let key = self.parse_dotted_admin_path(true)?;
2172                    self.expect(Token::Eq)?;
2173                    let value = self.parse_literal_value()?;
2174                    Ok(SqlCommand::SetSecret { key, value })
2175                } else if self.consume_ident_ci("TENANT")? {
2176                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
2177                    // SET TENANT NULL  |  SET TENANT = NULL
2178                    let _ = self.consume(&Token::Eq)?;
2179                    if self.consume_ident_ci("NULL")? {
2180                        Ok(SqlCommand::SetTenant(None))
2181                    } else {
2182                        let value = self.parse_literal_value()?;
2183                        match value {
2184                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2185                            Value::Null => Ok(SqlCommand::SetTenant(None)),
2186                            other => Err(ParseError::new(
2187                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2188                                self.position(),
2189                            )),
2190                        }
2191                    }
2192                } else {
2193                    Err(ParseError::expected(
2194                        vec!["CONFIG", "SECRET", "TENANT"],
2195                        self.peek(),
2196                        self.position(),
2197                    ))
2198                }
2199            }
2200            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2201                self.advance()?;
2202                match self.parse_apply_migration()? {
2203                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2204                    other => Err(ParseError::new(
2205                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2206                        self.position(),
2207                    )),
2208                }
2209            }
2210            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2211                // RESET TENANT — session-local clear
2212                self.advance()?;
2213                if self.consume_ident_ci("TENANT")? {
2214                    Ok(SqlCommand::SetTenant(None))
2215                } else {
2216                    Err(ParseError::expected(
2217                        vec!["TENANT"],
2218                        self.peek(),
2219                        self.position(),
2220                    ))
2221                }
2222            }
2223            Token::Ident(name)
2224                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
2225            {
2226                self.advance()?;
2227                let collection = self.parse_dotted_admin_path(false)?;
2228                let mut query = TableQuery::new("red.describe");
2229                query.filter = Some(Filter::compare(
2230                    FieldRef::column("", "collection"),
2231                    CompareOp::Eq,
2232                    Value::text(collection),
2233                ));
2234                Ok(SqlCommand::Select(query))
2235            }
2236            Token::Desc => {
2237                self.advance()?;
2238                let collection = self.parse_dotted_admin_path(false)?;
2239                let mut query = TableQuery::new("red.describe");
2240                query.filter = Some(Filter::compare(
2241                    FieldRef::column("", "collection"),
2242                    CompareOp::Eq,
2243                    Value::text(collection),
2244                ));
2245                Ok(SqlCommand::Select(query))
2246            }
2247            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2248                self.advance()?;
2249                if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
2250                    if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
2251                        return Err(ParseError::expected(
2252                            vec!["TABLE"],
2253                            self.peek(),
2254                            self.position(),
2255                        ));
2256                    }
2257                    let collection = self.parse_dotted_admin_path(false)?;
2258                    let mut query = TableQuery::new("red.show_create");
2259                    query.filter = Some(Filter::compare(
2260                        FieldRef::column("", "collection"),
2261                        CompareOp::Eq,
2262                        Value::text(collection),
2263                    ));
2264                    Ok(SqlCommand::Select(query))
2265                } else if self.consume_ident_ci("CONFIG")? {
2266                    // Accept dotted prefixes the same way SET CONFIG does
2267                    // (`SHOW CONFIG durability.mode`), and empty prefix
2268                    // (`SHOW CONFIG`) for a catalog-wide listing.
2269                    let prefix = if !self.check(&Token::Eof) {
2270                        let first = self.expect_ident()?;
2271                        let mut full = first;
2272                        while self.consume(&Token::Dot)? {
2273                            let next = self.expect_ident_or_keyword()?;
2274                            full = format!("{full}.{next}");
2275                        }
2276                        // Match SET CONFIG: lowercase so keyword segments
2277                        // come out consistent with the stored keys.
2278                        Some(full.to_ascii_lowercase())
2279                    } else {
2280                        None
2281                    };
2282                    Ok(SqlCommand::ShowConfig { prefix })
2283                } else if self.consume_ident_ci("COLLECTIONS")? {
2284                    let mut query = TableQuery::new("red.collections");
2285                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2286                        if !self.consume_ident_ci("INTERNAL")? {
2287                            return Err(ParseError::expected(
2288                                vec!["INTERNAL"],
2289                                self.peek(),
2290                                self.position(),
2291                            ));
2292                        }
2293                        true
2294                    } else {
2295                        false
2296                    };
2297                    self.parse_table_clauses(&mut query)?;
2298                    if !include_internal {
2299                        let user_filter = query.filter.take();
2300                        let hide_internal = crate::storage::query::ast::Filter::Compare {
2301                            field: FieldRef::column("", "internal"),
2302                            op: CompareOp::Eq,
2303                            value: Value::Boolean(false),
2304                        };
2305                        query.filter = Some(match user_filter {
2306                            Some(filter) => filter.and(hide_internal),
2307                            None => hide_internal,
2308                        });
2309                    }
2310                    Ok(SqlCommand::Select(query))
2311                } else if self.consume_ident_ci("TABLES")? {
2312                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2313                        self, "table",
2314                    )?))
2315                } else if self.consume_ident_ci("QUEUES")? {
2316                    // Issue #535 — `SHOW QUEUES` desugars to the
2317                    // `red.queues` virtual table (queue-shaped
2318                    // columns), not the filtered `red.collections`
2319                    // view. `INCLUDING INTERNAL` mirrors the
2320                    // `SHOW COLLECTIONS` opt-in: without it, DLQ
2321                    // targets and other auto-created queues are
2322                    // hidden via the `internal = false` filter.
2323                    let mut query = TableQuery::new("red.queues");
2324                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
2325                        if !self.consume_ident_ci("INTERNAL")? {
2326                            return Err(ParseError::expected(
2327                                vec!["INTERNAL"],
2328                                self.peek(),
2329                                self.position(),
2330                            ));
2331                        }
2332                        true
2333                    } else {
2334                        false
2335                    };
2336                    self.parse_table_clauses(&mut query)?;
2337                    if !include_internal {
2338                        let hide_internal = Filter::Compare {
2339                            field: FieldRef::column("", "internal"),
2340                            op: CompareOp::Eq,
2341                            value: Value::Boolean(false),
2342                        };
2343                        add_table_filter(&mut query, hide_internal);
2344                    }
2345                    Ok(SqlCommand::Select(query))
2346                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2347                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2348                        self, "vector",
2349                    )?))
2350                } else if self.consume_ident_ci("DOCUMENTS")? {
2351                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2352                        self, "document",
2353                    )?))
2354                } else if self.consume(&Token::Timeseries)?
2355                    || self.consume_ident_ci("TIMESERIES")?
2356                {
2357                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2358                        self,
2359                        "timeseries",
2360                    )?))
2361                } else if self.consume_ident_ci("GRAPHS")? {
2362                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2363                        self, "graph",
2364                    )?))
2365                } else if self.consume_ident_ci("CONFIGS")? {
2366                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2367                        self, "config",
2368                    )?))
2369                } else if self.consume_ident_ci("VAULTS")? {
2370                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2371                        self, "vault",
2372                    )?))
2373                } else if self.consume(&Token::Kv)?
2374                    || self.consume_ident_ci("KV")?
2375                    || self.consume_ident_ci("KVS")?
2376                {
2377                    Ok(SqlCommand::Select(parse_show_collections_by_model(
2378                        self, "kv",
2379                    )?))
2380                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2381                    let collection = self.parse_dotted_admin_path(false)?;
2382                    let mut query = TableQuery::new("red.columns");
2383                    query.filter = Some(Filter::compare(
2384                        FieldRef::column("", "collection"),
2385                        CompareOp::Eq,
2386                        Value::text(collection),
2387                    ));
2388                    Ok(SqlCommand::Select(query))
2389                } else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
2390                    let mut query = TableQuery::new("red.show_indexes");
2391                    if self.consume(&Token::On)? {
2392                        let collection = self.expect_ident_or_keyword()?;
2393                        let filter = Filter::Compare {
2394                            field: FieldRef::column("", "table"),
2395                            op: CompareOp::Eq,
2396                            value: Value::text(collection),
2397                        };
2398                        query.where_expr = Some(filter_to_expr(&filter));
2399                        query.filter = Some(filter);
2400                    }
2401                    self.parse_table_clauses(&mut query)?;
2402                    Ok(SqlCommand::Select(query))
2403                } else if self.consume_ident_ci("POLICIES")? {
2404                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2405                        let principal = self.parse_iam_principal_kind()?;
2406                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2407                            filter: Some(principal),
2408                        }));
2409                    }
2410                    let mut query = TableQuery::new("red.policies");
2411                    let collection_filter =
2412                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2413                            let collection = self.parse_dotted_admin_path(false)?;
2414                            Some(Filter::Compare {
2415                                field: FieldRef::TableColumn {
2416                                    table: String::new(),
2417                                    column: "collection".to_string(),
2418                                },
2419                                op: CompareOp::Eq,
2420                                value: Value::text(collection),
2421                            })
2422                        } else {
2423                            None
2424                        };
2425                    self.parse_table_clauses(&mut query)?;
2426                    if let Some(collection_filter) = collection_filter {
2427                        let combined = match query.filter.take() {
2428                            Some(existing) => {
2429                                Filter::And(Box::new(collection_filter), Box::new(existing))
2430                            }
2431                            None => collection_filter,
2432                        };
2433                        query.where_expr = Some(filter_to_expr(&combined));
2434                        query.filter = Some(combined);
2435                    }
2436                    Ok(SqlCommand::Select(query))
2437                } else if self.consume_ident_ci("STATS")? {
2438                    let mut query = TableQuery::new("red.stats");
2439                    let collection = match self.peek().clone() {
2440                        Token::Ident(name) => {
2441                            self.advance()?;
2442                            Some(name)
2443                        }
2444                        Token::String(name) => {
2445                            self.advance()?;
2446                            Some(name)
2447                        }
2448                        _ => None,
2449                    };
2450                    self.parse_table_clauses(&mut query)?;
2451                    if let Some(collection) = collection {
2452                        let filter = Filter::compare(
2453                            FieldRef::column("red.stats", "collection"),
2454                            CompareOp::Eq,
2455                            Value::text(collection),
2456                        );
2457                        let expr = filter_to_expr(&filter);
2458                        query.where_expr = Some(match query.where_expr.take() {
2459                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
2460                            None => expr,
2461                        });
2462                        query.filter = Some(match query.filter.take() {
2463                            Some(existing) => existing.and(filter),
2464                            None => filter,
2465                        });
2466                    }
2467                    Ok(SqlCommand::Select(query))
2468                } else if self.consume_ident_ci("SAMPLE")? {
2469                    let mut query = TableQuery::new(&self.expect_ident()?);
2470                    query.limit = if self.consume(&Token::Limit)? {
2471                        Some(self.parse_integer()? as u64)
2472                    } else {
2473                        Some(10)
2474                    };
2475                    Ok(SqlCommand::Select(query))
2476                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2477                    let prefix = if !self.check(&Token::Eof) {
2478                        Some(self.parse_dotted_admin_path(true)?)
2479                    } else {
2480                        None
2481                    };
2482                    Ok(SqlCommand::ShowSecrets { prefix })
2483                } else if self.consume_ident_ci("TENANT")? {
2484                    Ok(SqlCommand::ShowTenant)
2485                } else if let Some(expr) = self.parse_show_iam_after_show()? {
2486                    Ok(SqlCommand::IamPolicy(expr))
2487                } else {
2488                    Err(ParseError::expected(
2489                        vec![
2490                            "CONFIG",
2491                            "SECRET",
2492                            "SECRETS",
2493                            "COLLECTIONS",
2494                            "TABLES",
2495                            "QUEUES",
2496                            "VECTORS",
2497                            "DOCUMENTS",
2498                            "TIMESERIES",
2499                            "GRAPHS",
2500                            "KV",
2501                            "SCHEMA",
2502                            "INDICES",
2503                            "INDEXES",
2504                            "SAMPLE",
2505                            "POLICIES",
2506                            "STATS",
2507                            "TENANT",
2508                            "EFFECTIVE",
2509                        ],
2510                        self.peek(),
2511                        self.position(),
2512                    ))
2513                }
2514            }
2515            // Transaction control statements (Phase 1.1 PG parity).
2516            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
2517            // START TRANSACTION [ISOLATION LEVEL <mode>]
2518            //
2519            // We only implement SNAPSHOT ISOLATION (our default). We
2520            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2521            // READ / SNAPSHOT as PG-compatible no-ops, but reject
2522            // SERIALIZABLE outright — the previous behaviour of
2523            // silently degrading to snapshot made the parser
2524            // dishonest. Real SSI (Serializable Snapshot Isolation)
2525            // is tracked as a future milestone.
2526            Token::Begin | Token::Start => {
2527                self.advance()?;
2528                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2529                // Optional ISOLATION LEVEL clause.
2530                if self.consume_ident_ci("ISOLATION")? {
2531                    self.expect(Token::Level)?;
2532                    // The level identifier can span multiple words
2533                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
2534                    // READ). Collect them case-insensitively.
2535                    let mut parts: Vec<String> = Vec::new();
2536                    if self.consume_ident_ci("READ")? {
2537                        parts.push("READ".to_string());
2538                        if self.consume_ident_ci("UNCOMMITTED")? {
2539                            parts.push("UNCOMMITTED".to_string());
2540                        } else if self.consume_ident_ci("COMMITTED")? {
2541                            parts.push("COMMITTED".to_string());
2542                        } else {
2543                            return Err(ParseError::expected(
2544                                vec!["UNCOMMITTED", "COMMITTED"],
2545                                self.peek(),
2546                                self.position(),
2547                            ));
2548                        }
2549                    } else if self.consume_ident_ci("REPEATABLE")? {
2550                        parts.push("REPEATABLE".to_string());
2551                        if !self.consume_ident_ci("READ")? {
2552                            return Err(ParseError::expected(
2553                                vec!["READ"],
2554                                self.peek(),
2555                                self.position(),
2556                            ));
2557                        }
2558                        parts.push("READ".to_string());
2559                    } else if self.consume_ident_ci("SNAPSHOT")? {
2560                        parts.push("SNAPSHOT".to_string());
2561                    } else if self.consume_ident_ci("SERIALIZABLE")? {
2562                        return Err(ParseError::new(
2563                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2564                             currently provides SNAPSHOT ISOLATION (which PG calls \
2565                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2566                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
2567                                .to_string(),
2568                            self.position(),
2569                        ));
2570                    } else {
2571                        return Err(ParseError::expected(
2572                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2573                            self.peek(),
2574                            self.position(),
2575                        ));
2576                    }
2577                    // All accepted modes map to our snapshot engine today.
2578                    let _ = parts;
2579                }
2580                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2581            }
2582            // COMMIT [WORK | TRANSACTION]
2583            Token::Commit => {
2584                self.advance()?;
2585                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2586                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2587            }
2588            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
2589            // ROLLBACK MIGRATION name
2590            Token::Rollback => {
2591                self.advance()?;
2592                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2593                    match self.parse_rollback_migration_after_keyword()? {
2594                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2595                        other => Err(ParseError::new(
2596                            format!(
2597                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2598                            ),
2599                            self.position(),
2600                        )),
2601                    }
2602                } else {
2603                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2604                    if self.consume(&Token::To)? {
2605                        let _ = self.consume(&Token::Savepoint)?;
2606                        let name = self.expect_ident()?;
2607                        Ok(SqlCommand::TransactionControl(
2608                            TxnControl::RollbackToSavepoint(name),
2609                        ))
2610                    } else {
2611                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2612                    }
2613                }
2614            }
2615            // SAVEPOINT name
2616            Token::Savepoint => {
2617                self.advance()?;
2618                let name = self.expect_ident()?;
2619                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2620            }
2621            // RELEASE [SAVEPOINT] name
2622            Token::Release => {
2623                self.advance()?;
2624                let _ = self.consume(&Token::Savepoint)?;
2625                let name = self.expect_ident()?;
2626                Ok(SqlCommand::TransactionControl(
2627                    TxnControl::ReleaseSavepoint(name),
2628                ))
2629            }
2630            // VACUUM [FULL] [table]
2631            Token::Vacuum => {
2632                self.advance()?;
2633                let full = self.consume(&Token::Full)?;
2634                let target = if self.check(&Token::Eof) {
2635                    None
2636                } else {
2637                    Some(self.expect_ident()?)
2638                };
2639                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2640                    target,
2641                    full,
2642                }))
2643            }
2644            // REFRESH MATERIALIZED VIEW name
2645            Token::Refresh => {
2646                self.advance()?;
2647                self.expect(Token::Materialized)?;
2648                self.expect(Token::View)?;
2649                let name = self.expect_ident()?;
2650                Ok(SqlCommand::RefreshMaterializedView(
2651                    RefreshMaterializedViewQuery { name },
2652                ))
2653            }
2654            // ANALYZE [table]
2655            Token::Analyze => {
2656                self.advance()?;
2657                let target = if self.check(&Token::Eof) {
2658                    None
2659                } else {
2660                    Some(self.expect_ident()?)
2661                };
2662                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2663                    target,
2664                }))
2665            }
2666            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
2667            //
2668            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
2669            // short-form `DELIMITER ',' HEADER`. The only supported format
2670            // today is CSV.
2671            Token::Copy => {
2672                self.advance()?;
2673                let table = self.expect_ident()?;
2674                self.expect(Token::From)?;
2675                let path = self.parse_string()?;
2676
2677                let mut delimiter: Option<char> = None;
2678                let mut has_header = false;
2679                let format = CopyFormat::Csv;
2680
2681                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
2682                // `WITH` is a reserved keyword token — accept both the keyword
2683                // form and the ident form that non-CTE callers sometimes emit.
2684                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2685                    self.expect(Token::LParen)?;
2686                    loop {
2687                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2688                            let _ = self.consume(&Token::Eq)?;
2689                            // Only CSV for now — accept the ident and move on.
2690                            let _ = self.expect_ident()?;
2691                        } else if self.consume(&Token::Header)? {
2692                            let _ = self.consume(&Token::Eq)?;
2693                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
2694                            // or an ident spelling of true/false.
2695                            has_header = match self.peek().clone() {
2696                                Token::True => {
2697                                    self.advance()?;
2698                                    true
2699                                }
2700                                Token::False => {
2701                                    self.advance()?;
2702                                    false
2703                                }
2704                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2705                                    self.advance()?;
2706                                    true
2707                                }
2708                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2709                                    self.advance()?;
2710                                    false
2711                                }
2712                                _ => true,
2713                            };
2714                        } else if self.consume(&Token::Delimiter)? {
2715                            let _ = self.consume(&Token::Eq)?;
2716                            let s = self.parse_string()?;
2717                            delimiter = s.chars().next();
2718                        } else {
2719                            break;
2720                        }
2721                        if !self.consume(&Token::Comma)? {
2722                            break;
2723                        }
2724                    }
2725                    self.expect(Token::RParen)?;
2726                }
2727
2728                // Short form clauses outside WITH (in either order).
2729                loop {
2730                    if self.consume(&Token::Delimiter)? {
2731                        let s = self.parse_string()?;
2732                        delimiter = s.chars().next();
2733                    } else if self.consume(&Token::Header)? {
2734                        has_header = true;
2735                    } else {
2736                        break;
2737                    }
2738                }
2739
2740                Ok(SqlCommand::CopyFrom(CopyFromQuery {
2741                    table,
2742                    path,
2743                    format,
2744                    delimiter,
2745                    has_header,
2746                }))
2747            }
2748            other => Err(ParseError::expected(
2749                vec![
2750                    "SELECT",
2751                    "FROM",
2752                    "INSERT",
2753                    "UPDATE",
2754                    "DELETE",
2755                    "EXPLAIN",
2756                    "CREATE",
2757                    "DROP",
2758                    "ALTER",
2759                    "SET",
2760                    "SHOW",
2761                    "BEGIN",
2762                    "COMMIT",
2763                    "ROLLBACK",
2764                    "SAVEPOINT",
2765                    "RELEASE",
2766                    "START",
2767                    "VACUUM",
2768                    "ANALYZE",
2769                    "COPY",
2770                    "REFRESH",
2771                    "DESCRIBE",
2772                    "DESC",
2773                ],
2774                other,
2775                self.position(),
2776            )),
2777        }
2778    }
2779}