Skip to main content

reddb_rql/
sql.rs

1use crate::ast::{
2    AlterMetricQuery, AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery,
3    AskQuery, BinOp, CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
4    CreateForeignTableQuery, CreateIndexQuery, CreateMetricQuery, CreateMigrationQuery,
5    CreatePolicyQuery, CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery,
6    CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery, CreateUserStmt,
7    CreateVectorQuery, CreateViewQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery,
8    DropForeignTableQuery, DropGraphQuery, DropIndexQuery, DropKvQuery, DropPolicyQuery,
9    DropQueueQuery, DropSchemaQuery, DropSequenceQuery, DropServerQuery, DropTableQuery,
10    DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery, DropViewQuery, EventsBackfillQuery,
11    ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef, Filter, ForeignColumnDef, GrantStmt,
12    GraphCommand, GraphQuery, HybridQuery, InsertQuery, JoinQuery, KvCommand, MaintenanceCommand,
13    PathQuery, PolicyAction, ProbabilisticCommand, QueryExpr, QueueCommand, QueueSelectQuery,
14    RankOfQuery, RankRangeQuery, RefreshMaterializedViewQuery, RevokeStmt, RollbackMigrationQuery,
15    SearchCommand, Span, TableQuery, TreeCommand, TruncateQuery, TxnControl, UpdateQuery,
16    VectorQuery,
17};
18use crate::lexer::Token;
19use crate::parser::{ParseError, Parser, SafeTokenDisplay};
20use crate::sql_lowering::filter_to_expr;
21use reddb_types::catalog::CollectionModel;
22use reddb_types::types::Value;
23
24/// Canonical SQL frontend command surface.
25///
26/// This is the single entrypoint for SQL/RQL-style commands before they are
27/// lowered into the broader multi-backend `QueryExpr` space.
28#[derive(Debug, Clone)]
29pub enum SqlStatement {
30    Query(SqlQuery),
31    Mutation(SqlMutation),
32    Schema(SqlSchemaCommand),
33    Admin(SqlAdminCommand),
34}
35
36#[derive(Debug, Clone)]
37#[allow(clippy::large_enum_variant)]
38pub enum FrontendStatement {
39    Sql(SqlStatement),
40    Graph(GraphQuery),
41    GraphCommand(GraphCommand),
42    Path(PathQuery),
43    Vector(VectorQuery),
44    Hybrid(HybridQuery),
45    Search(SearchCommand),
46    Ask(AskQuery),
47    QueueSelect(QueueSelectQuery),
48    QueueCommand(QueueCommand),
49    EventsBackfill(EventsBackfillQuery),
50    EventsBackfillStatus { collection: String },
51    TreeCommand(TreeCommand),
52    ProbabilisticCommand(ProbabilisticCommand),
53    KvCommand(KvCommand),
54    ConfigCommand(ConfigCommand),
55    Ranking(QueryExpr),
56}
57
58#[derive(Debug, Clone)]
59pub enum SqlCommand {
60    Select(TableQuery),
61    Join(JoinQuery),
62    Insert(InsertQuery),
63    Update(UpdateQuery),
64    Delete(DeleteQuery),
65    ExplainAlter(ExplainAlterQuery),
66    CreateTable(CreateTableQuery),
67    CreateCollection(CreateCollectionQuery),
68    CreateVector(CreateVectorQuery),
69    DropTable(DropTableQuery),
70    DropGraph(DropGraphQuery),
71    DropVector(DropVectorQuery),
72    DropDocument(DropDocumentQuery),
73    DropKv(DropKvQuery),
74    DropCollection(DropCollectionQuery),
75    Truncate(TruncateQuery),
76    AlterTable(AlterTableQuery),
77    CreateIndex(CreateIndexQuery),
78    DropIndex(DropIndexQuery),
79    CreateTimeSeries(CreateTimeSeriesQuery),
80    CreateMetric(CreateMetricQuery),
81    AlterMetric(AlterMetricQuery),
82    CreateSlo(CreateSloQuery),
83    DropTimeSeries(DropTimeSeriesQuery),
84    CreateQueue(CreateQueueQuery),
85    AlterQueue(AlterQueueQuery),
86    DropQueue(DropQueueQuery),
87    CreateTree(CreateTreeQuery),
88    DropTree(DropTreeQuery),
89    Probabilistic(ProbabilisticCommand),
90    SetConfig {
91        key: String,
92        value: Value,
93    },
94    ShowConfig {
95        prefix: Option<String>,
96        as_json: bool,
97    },
98    SetSecret {
99        key: String,
100        value: Value,
101    },
102    DeleteSecret {
103        key: String,
104    },
105    ShowSecrets {
106        prefix: Option<String>,
107    },
108    SetTenant(Option<String>),
109    ShowTenant,
110    TransactionControl(TxnControl),
111    Maintenance(MaintenanceCommand),
112    CreateSchema(CreateSchemaQuery),
113    DropSchema(DropSchemaQuery),
114    CreateSequence(CreateSequenceQuery),
115    DropSequence(DropSequenceQuery),
116    CopyFrom(CopyFromQuery),
117    CreateView(CreateViewQuery),
118    DropView(DropViewQuery),
119    RefreshMaterializedView(RefreshMaterializedViewQuery),
120    CreatePolicy(CreatePolicyQuery),
121    DropPolicy(DropPolicyQuery),
122    CreateServer(CreateServerQuery),
123    DropServer(DropServerQuery),
124    CreateForeignTable(CreateForeignTableQuery),
125    DropForeignTable(DropForeignTableQuery),
126    /// `GRANT … ON … TO …`
127    Grant(GrantStmt),
128    /// `REVOKE … ON … FROM …`
129    Revoke(RevokeStmt),
130    /// `ALTER USER name <attrs>`
131    AlterUser(AlterUserStmt),
132    /// `CREATE USER name PASSWORD '...' [ROLE read|write|admin]`
133    CreateUser(CreateUserStmt),
134    /// IAM policy DDL (CREATE POLICY '...' AS '...', DROP POLICY '...',
135    /// ATTACH/DETACH POLICY, SHOW POLICIES, SIMULATE, SHOW EFFECTIVE
136    /// PERMISSIONS). Stored as a pre-built QueryExpr so the dispatcher
137    /// can route the multitude of shapes through a single arm.
138    IamPolicy(QueryExpr),
139    CreateMigration(CreateMigrationQuery),
140    ApplyMigration(ApplyMigrationQuery),
141    RollbackMigration(RollbackMigrationQuery),
142    ExplainMigration(ExplainMigrationQuery),
143}
144
145/// Issue #789 — Analytics v0 non-goal map for `CREATE …` forms.
146///
147/// PRD #782 ringfences Analytics v0 around a metric-centric catalog and
148/// explicitly excludes generic analytics objects, a separate event
149/// storage model, cohorts, funnels, SLA contracts, and adapter surfaces.
150/// When the parser sees one of these idents in the `CREATE` head, return
151/// a stable v0-scoped rejection message; otherwise return `None` and let
152/// the regular CREATE fallback handle the token.
153fn analytics_v0_non_goal_create(token: &Token) -> Option<String> {
154    let ident = match token {
155        Token::Ident(s) => s,
156        _ => return None,
157    };
158    let upper = ident.to_ascii_uppercase();
159    let message = match upper.as_str() {
160        "ANALYTICS" => {
161            "CREATE ANALYTICS is not supported in Analytics v0 — \
162             use CREATE METRIC <dotted.path> for the metric-centric \
163             catalog (PRD #782 non-goal)"
164        }
165        "EVENT" => {
166            "CREATE EVENT is not supported in Analytics v0 — \
167             event-shaped data lives in ordinary TABLE/DOCUMENT \
168             collections, not a new storage model (PRD #782 non-goal)"
169        }
170        "COHORT" => {
171            "CREATE COHORT is not supported in Analytics v0 — \
172             cohort surfaces are deferred (PRD #782 non-goal)"
173        }
174        "FUNNEL" => {
175            "CREATE FUNNEL is not supported in Analytics v0 — \
176             funnel surfaces are deferred (PRD #782 non-goal)"
177        }
178        "SLA" => {
179            "CREATE SLA is not supported in Analytics v0 — \
180             SLA/legal/commercial contract modeling is post-MVP \
181             (PRD #782 non-goal)"
182        }
183        "ADAPTER" => {
184            "CREATE ADAPTER is not supported in Analytics v0 — \
185             Prometheus/Grafana/Snowplow/Google Analytics adapters \
186             are deferred (PRD #782 non-goal)"
187        }
188        _ => return None,
189    };
190    Some(message.to_string())
191}
192
193fn collection_model_filter(model: &str) -> Filter {
194    Filter::Compare {
195        field: FieldRef::column("", "model"),
196        op: CompareOp::Eq,
197        value: Value::Text(model.to_string().into()),
198    }
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use reddb_types::catalog::CollectionModel;
205
206    fn frontend(input: &str) -> FrontendStatement {
207        parse_frontend(input)
208            .unwrap_or_else(|err| panic!("failed to parse frontend {input:?}: {err:?}"))
209    }
210
211    fn expr(input: &str) -> QueryExpr {
212        frontend(input).into_query_expr()
213    }
214
215    fn sql_command(input: &str) -> SqlCommand {
216        sql_command_result(input)
217            .unwrap_or_else(|err| panic!("failed to parse SQL command {input:?}: {err:?}"))
218    }
219
220    fn sql_command_result(input: &str) -> Result<SqlCommand, ParseError> {
221        let mut parser = Parser::new(input)?;
222        parser.parse_sql_command()
223    }
224
225    fn assert_text(value: &Value, expected: &str) {
226        match value {
227            Value::Text(text) => assert_eq!(text.as_ref(), expected),
228            other => panic!("expected text {expected:?}, got {other:?}"),
229        }
230    }
231
232    #[test]
233    fn parse_frontend_routes_core_sql_statements() {
234        let FrontendStatement::Sql(SqlStatement::Query(SqlQuery::Select(query))) =
235            frontend("SELECT * FROM users")
236        else {
237            panic!("SELECT should route to SqlStatement::Query::Select");
238        };
239        assert_eq!(query.table, "users");
240
241        let QueryExpr::Insert(query) = expr("INSERT INTO users (id, name) VALUES (1, 'ada')")
242        else {
243            panic!("INSERT should lower through the SQL frontend");
244        };
245        assert_eq!(query.table, "users");
246        assert_eq!(query.columns, vec!["id", "name"]);
247        assert_eq!(query.values.len(), 1);
248
249        let QueryExpr::Update(query) = expr("UPDATE users SET name = 'ada' WHERE id = 1") else {
250            panic!("UPDATE should lower through the SQL frontend");
251        };
252        assert_eq!(query.table, "users");
253        assert_eq!(query.assignments[0].0, "name");
254
255        let QueryExpr::Delete(query) = expr("DELETE FROM users WHERE id = 1") else {
256            panic!("DELETE should lower through the SQL frontend");
257        };
258        assert_eq!(query.table, "users");
259        assert!(query.filter.is_some());
260
261        let QueryExpr::CreateTable(query) = expr("CREATE TABLE users (id INT, name TEXT)") else {
262            panic!("CREATE TABLE should lower through the SQL frontend");
263        };
264        assert_eq!(query.collection_model, CollectionModel::Table);
265        assert_eq!(query.name, "users");
266        assert_eq!(query.columns[0].name, "id");
267
268        let QueryExpr::DropTable(query) = expr("DROP TABLE IF EXISTS users") else {
269            panic!("DROP TABLE should lower through the SQL frontend");
270        };
271        assert_eq!(query.name, "users");
272        assert!(query.if_exists);
273    }
274
275    #[test]
276    fn parse_frontend_routes_admin_and_catalog_sql() {
277        let QueryExpr::Table(query) = expr("SHOW COLLECTIONS") else {
278            panic!("SHOW COLLECTIONS should become a red.collections table query");
279        };
280        assert_eq!(query.table, "red.collections");
281        assert!(query.filter.is_some());
282
283        let QueryExpr::Table(query) = expr("SHOW TABLES LIMIT 5") else {
284            panic!("SHOW TABLES should become a filtered red.collections table query");
285        };
286        assert_eq!(query.table, "red.collections");
287        assert_eq!(query.limit, Some(5));
288        assert!(query.filter.is_some());
289
290        assert!(matches!(
291            expr("SHOW CONFIG durability.mode"),
292            QueryExpr::ShowConfig { prefix: Some(prefix), as_json: false } if prefix == "durability.mode"
293        ));
294        assert!(matches!(
295            expr("SHOW CONFIG"),
296            QueryExpr::ShowConfig {
297                prefix: None,
298                as_json: false
299            }
300        ));
301        assert!(matches!(
302            expr("SHOW CONFIG runtime.result_cache AS JSON"),
303            QueryExpr::ShowConfig { prefix: Some(prefix), as_json: true } if prefix == "runtime.result_cache"
304        ));
305        assert!(matches!(
306            expr("SHOW CONFIG FORMAT JSON"),
307            QueryExpr::ShowConfig {
308                prefix: None,
309                as_json: true
310            }
311        ));
312
313        let QueryExpr::SetConfig { key, value } = expr("SET CONFIG durability.mode = 'sync'")
314        else {
315            panic!("SET CONFIG should stay on the SQL admin surface");
316        };
317        assert_eq!(key, "durability.mode");
318        assert_text(&value, "sync");
319
320        let QueryExpr::SetSecret { key, value } = expr("SET SECRET provider.api_key = 'sk_test'")
321        else {
322            panic!("SET SECRET should stay on the SQL admin surface");
323        };
324        assert_eq!(key, "provider.api_key");
325        assert_text(&value, "sk_test");
326        assert!(matches!(
327            expr("SET SECRET red.secrets.provider.api_key = 'sk_test'"),
328            QueryExpr::SetSecret { key, .. } if key == "red.secret.provider.api_key"
329        ));
330
331        assert!(matches!(
332            expr("DELETE SECRET provider.api_key"),
333            QueryExpr::DeleteSecret { key } if key == "provider.api_key"
334        ));
335        assert!(matches!(
336            expr("DELETE SECRET red.secrets.provider.api_key"),
337            QueryExpr::DeleteSecret { key } if key == "red.secret.provider.api_key"
338        ));
339        assert!(matches!(
340            expr("SHOW SECRETS provider"),
341            QueryExpr::ShowSecrets { prefix: Some(prefix) } if prefix == "provider"
342        ));
343        assert!(matches!(
344            expr("SHOW SECRETS red.secrets.provider"),
345            QueryExpr::ShowSecrets { prefix: Some(prefix) } if prefix == "red.secret.provider"
346        ));
347        assert!(matches!(
348            expr("SET TENANT 'acme'"),
349            QueryExpr::SetTenant(Some(tenant)) if tenant == "acme"
350        ));
351        assert!(matches!(expr("RESET TENANT"), QueryExpr::SetTenant(None)));
352        assert!(matches!(expr("SHOW TENANT"), QueryExpr::ShowTenant));
353        assert!(matches!(
354            expr("BEGIN ISOLATION LEVEL SNAPSHOT"),
355            QueryExpr::TransactionControl(TxnControl::Begin)
356        ));
357        assert!(matches!(
358            expr("ROLLBACK TO SAVEPOINT sp1"),
359            QueryExpr::TransactionControl(TxnControl::RollbackToSavepoint(name)) if name == "sp1"
360        ));
361        assert!(matches!(
362            expr("VACUUM FULL users"),
363            QueryExpr::MaintenanceCommand(MaintenanceCommand::Vacuum {
364                target: Some(target),
365                full: true,
366            }) if target == "users"
367        ));
368    }
369
370    #[test]
371    fn parse_frontend_routes_extended_schema_sql() {
372        assert!(matches!(
373            expr("CREATE SCHEMA IF NOT EXISTS app"),
374            QueryExpr::CreateSchema(CreateSchemaQuery {
375                name,
376                if_not_exists: true,
377            }) if name == "app"
378        ));
379        assert!(matches!(
380            expr("DROP SCHEMA IF EXISTS app CASCADE"),
381            QueryExpr::DropSchema(DropSchemaQuery {
382                name,
383                if_exists: true,
384                cascade: true,
385            }) if name == "app"
386        ));
387        assert!(matches!(
388            expr("CREATE SEQUENCE IF NOT EXISTS seq START WITH 10 INCREMENT BY 2"),
389            QueryExpr::CreateSequence(CreateSequenceQuery {
390                name,
391                if_not_exists: true,
392                start: 10,
393                increment: 2,
394            }) if name == "seq"
395        ));
396        assert!(matches!(
397            expr("DROP SEQUENCE IF EXISTS seq"),
398            QueryExpr::DropSequence(DropSequenceQuery {
399                name,
400                if_exists: true,
401            }) if name == "seq"
402        ));
403
404        let QueryExpr::CopyFrom(copy) = expr(
405            "COPY users FROM '/tmp/u.csv' WITH (FORMAT = csv, HEADER = true, DELIMITER = ';')",
406        ) else {
407            panic!("COPY should lower through SQL frontend");
408        };
409        assert_eq!(copy.table, "users");
410        assert_eq!(copy.path, "/tmp/u.csv");
411        assert_eq!(copy.format, CopyFormat::Csv);
412        assert_eq!(copy.delimiter, Some(';'));
413        assert!(copy.has_header);
414
415        let QueryExpr::CreateView(view) = expr(
416            "CREATE MATERIALIZED VIEW IF NOT EXISTS mv WITH RETENTION 1 h \
417             AS SELECT id FROM users REFRESH EVERY 5 s",
418        ) else {
419            panic!("CREATE MATERIALIZED VIEW should lower through SQL frontend");
420        };
421        assert_eq!(view.name, "mv");
422        assert!(view.materialized);
423        assert!(view.if_not_exists);
424        assert_eq!(view.retention_duration_ms, Some(3_600_000));
425        assert_eq!(view.refresh_every_ms, Some(5_000));
426        assert!(matches!(*view.query, QueryExpr::Table(_)));
427
428        assert!(matches!(
429            expr("DROP MATERIALIZED VIEW IF EXISTS mv"),
430            QueryExpr::DropView(DropViewQuery {
431                name,
432                materialized: true,
433                if_exists: true,
434            }) if name == "mv"
435        ));
436        assert!(matches!(
437            expr("REFRESH MATERIALIZED VIEW mv"),
438            QueryExpr::RefreshMaterializedView(RefreshMaterializedViewQuery { name }) if name == "mv"
439        ));
440    }
441
442    #[test]
443    fn parse_frontend_routes_fdw_policy_auth_and_migrations() {
444        let QueryExpr::CreateServer(server) = expr(
445            "CREATE SERVER IF NOT EXISTS csvsrv FOREIGN DATA WRAPPER csv OPTIONS (path '/data')",
446        ) else {
447            panic!("CREATE SERVER should lower through SQL frontend");
448        };
449        assert_eq!(server.name, "csvsrv");
450        assert_eq!(server.wrapper, "csv");
451        assert!(server.if_not_exists);
452        assert_eq!(
453            server.options,
454            vec![("path".to_string(), "/data".to_string())]
455        );
456
457        let QueryExpr::CreateForeignTable(table) = expr(
458            "CREATE FOREIGN TABLE IF NOT EXISTS ext_users \
459             (id INT, name TEXT) SERVER csvsrv OPTIONS (file 'users.csv')",
460        ) else {
461            panic!("CREATE FOREIGN TABLE should lower through SQL frontend");
462        };
463        assert_eq!(table.name, "ext_users");
464        assert_eq!(table.server, "csvsrv");
465        assert!(table.if_not_exists);
466        assert_eq!(table.columns.len(), 2);
467        assert!(!table.columns[0].not_null);
468
469        assert!(matches!(
470            expr("DROP SERVER IF EXISTS csvsrv CASCADE"),
471            QueryExpr::DropServer(DropServerQuery {
472                name,
473                if_exists: true,
474                cascade: true,
475            }) if name == "csvsrv"
476        ));
477        assert!(matches!(
478            expr("DROP FOREIGN TABLE IF EXISTS ext_users"),
479            QueryExpr::DropForeignTable(DropForeignTableQuery {
480                name,
481                if_exists: true,
482            }) if name == "ext_users"
483        ));
484
485        let QueryExpr::CreatePolicy(policy) = expr(
486            "CREATE POLICY readonly ON NODES OF mygraph FOR SELECT TO analytics USING (public = 1)",
487        ) else {
488            panic!("CREATE POLICY should lower through SQL frontend");
489        };
490        assert_eq!(policy.name, "readonly");
491        assert_eq!(policy.table, "mygraph");
492        assert_eq!(policy.action, Some(PolicyAction::Select));
493        assert_eq!(policy.role.as_deref(), Some("analytics"));
494        assert_eq!(policy.target_kind.as_ident(), "nodes");
495
496        assert!(matches!(
497            expr("DROP POLICY IF EXISTS readonly ON mygraph"),
498            QueryExpr::DropPolicy(DropPolicyQuery {
499                name,
500                table,
501                if_exists: true,
502            }) if name == "readonly" && table == "mygraph"
503        ));
504
505        assert!(matches!(
506            expr("GRANT SELECT ON TABLE public.users TO tenant1.alice"),
507            QueryExpr::Grant(grant)
508                if grant.actions == vec!["SELECT"]
509                    && grant.objects[0].schema.as_deref() == Some("public")
510        ));
511        assert!(matches!(
512            expr("REVOKE GRANT OPTION FOR USAGE ON SCHEMA analytics FROM GROUP analysts"),
513            QueryExpr::Revoke(revoke) if revoke.grant_option_for && revoke.all == false
514        ));
515        assert!(matches!(
516            expr("ALTER USER bob ENABLE SET search_path TO 'public'"),
517            QueryExpr::AlterUser(user)
518                if user.username == "bob" && user.attributes.len() == 2
519        ));
520        assert!(matches!(
521            expr("CREATE USER tenant1.alice WITH PASSWORD 'pw' ROLE write"),
522            QueryExpr::CreateUser(user)
523                if user.tenant.as_deref() == Some("tenant1")
524                    && user.username == "alice"
525                    && user.password == "pw"
526                    && user.role == "write"
527        ));
528
529        assert!(matches!(
530            expr("CREATE POLICY 'readonly' AS '{\"Statement\":[]}'"),
531            QueryExpr::CreateIamPolicy { id, json }
532                if id == "readonly" && json == "{\"Statement\":[]}"
533        ));
534        assert!(matches!(
535            expr("DROP POLICY 'readonly'"),
536            QueryExpr::DropIamPolicy { id } if id == "readonly"
537        ));
538
539        assert!(matches!(
540            expr("CREATE MIGRATION m2 DEPENDS ON m0 BATCH 10 ROWS AS CREATE TABLE accounts (id INT)"),
541            QueryExpr::CreateMigration(migration)
542                if migration.name == "m2"
543                    && migration.depends_on == vec!["m0".to_string()]
544                    && migration.batch_size == Some(10)
545        ));
546        assert!(matches!(
547            expr("APPLY MIGRATION * FOR TENANT tenant1"),
548            QueryExpr::ApplyMigration(apply)
549                if apply.for_tenant.as_deref() == Some("tenant1")
550        ));
551        assert!(matches!(
552            expr("ROLLBACK MIGRATION m2"),
553            QueryExpr::RollbackMigration(RollbackMigrationQuery { name }) if name == "m2"
554        ));
555        assert!(matches!(
556            expr("EXPLAIN MIGRATION m2"),
557            QueryExpr::ExplainMigration(ExplainMigrationQuery { name }) if name == "m2"
558        ));
559    }
560
561    #[test]
562    fn parse_sql_statement_covers_statement_category_wrapping() {
563        enum Expected {
564            Select,
565            Insert,
566            CreateSchema,
567            SetTenant,
568        }
569
570        let cases = [
571            ("SELECT * FROM users", Expected::Select),
572            ("INSERT INTO users (id) VALUES (1)", Expected::Insert),
573            ("CREATE SCHEMA app", Expected::CreateSchema),
574            ("SET TENANT 'acme'", Expected::SetTenant),
575        ];
576
577        for (input, expected) in cases {
578            let mut parser = Parser::new(input).expect("lexer");
579            let statement = parser
580                .parse_sql_statement()
581                .unwrap_or_else(|err| panic!("failed to parse {input:?}: {err:?}"));
582            let matched = match expected {
583                Expected::Select => matches!(statement, SqlStatement::Query(SqlQuery::Select(_))),
584                Expected::Insert => {
585                    matches!(statement, SqlStatement::Mutation(SqlMutation::Insert(_)))
586                }
587                Expected::CreateSchema => matches!(
588                    statement,
589                    SqlStatement::Schema(SqlSchemaCommand::CreateSchema(_))
590                ),
591                Expected::SetTenant => {
592                    matches!(
593                        statement,
594                        SqlStatement::Admin(SqlAdminCommand::SetTenant(_))
595                    )
596                }
597            };
598            assert!(matched, "{input}");
599        }
600    }
601
602    #[test]
603    fn parse_frontend_routes_non_sql_frontends() {
604        let QueryExpr::KvCommand(KvCommand::Get {
605            model,
606            collection,
607            key,
608        }) = expr("KV GET settings.feature")
609        else {
610            panic!("KV GET should route to FrontendStatement::KvCommand");
611        };
612        assert_eq!(model, CollectionModel::Kv);
613        assert_eq!(collection, "settings");
614        assert_eq!(key, "feature");
615
616        let QueryExpr::ConfigCommand(ConfigCommand::Watch {
617            collection,
618            key,
619            prefix,
620            from_lsn,
621        }) = expr("WATCH CONFIG app PREFIX feature FROM LSN 7")
622        else {
623            panic!("WATCH CONFIG should route to FrontendStatement::ConfigCommand");
624        };
625        assert_eq!(collection, "app");
626        assert_eq!(key, "feature");
627        assert!(prefix);
628        assert_eq!(from_lsn, Some(7));
629
630        let QueryExpr::ConfigCommand(ConfigCommand::List {
631            collection,
632            prefix,
633            limit,
634            offset,
635        }) = expr("LIST CONFIG app PREFIX feature LIMIT 3 OFFSET 1")
636        else {
637            panic!("LIST CONFIG should route to FrontendStatement::ConfigCommand");
638        };
639        assert_eq!(collection, "app");
640        assert_eq!(prefix.as_deref(), Some("feature"));
641        assert_eq!(limit, Some(3));
642        assert_eq!(offset, 1);
643
644        let QueryExpr::KvCommand(KvCommand::List {
645            model,
646            collection,
647            prefix,
648            limit,
649            offset,
650            as_json,
651        }) = expr("KV LIST settings PREFIX 'feature.' LIMIT 10 OFFSET 2")
652        else {
653            panic!("KV LIST should route to FrontendStatement::KvCommand");
654        };
655        assert_eq!(model, CollectionModel::Kv);
656        assert_eq!(collection, "settings");
657        assert_eq!(prefix.as_deref(), Some("feature."));
658        assert_eq!(limit, Some(10));
659        assert_eq!(offset, 2);
660        assert!(!as_json);
661
662        let QueryExpr::KvCommand(KvCommand::List {
663            model,
664            collection,
665            prefix,
666            limit,
667            offset,
668            as_json,
669        }) = expr("LIST KV settings PREFIX feature LIMIT 10 OFFSET 2")
670        else {
671            panic!("LIST KV should route to FrontendStatement::KvCommand");
672        };
673        assert_eq!(model, CollectionModel::Kv);
674        assert_eq!(collection, "settings");
675        assert_eq!(prefix.as_deref(), Some("feature"));
676        assert_eq!(limit, Some(10));
677        assert_eq!(offset, 2);
678        assert!(!as_json);
679
680        let QueryExpr::KvCommand(KvCommand::List {
681            model,
682            collection,
683            prefix,
684            as_json,
685            ..
686        }) = expr("KV LIST settings PREFIX feature AS JSON")
687        else {
688            panic!("KV LIST AS JSON should route to FrontendStatement::KvCommand");
689        };
690        assert_eq!(model, CollectionModel::Kv);
691        assert_eq!(collection, "settings");
692        assert_eq!(prefix.as_deref(), Some("feature"));
693        assert!(as_json);
694
695        let QueryExpr::KvCommand(KvCommand::Watch {
696            model,
697            collection,
698            key,
699            prefix,
700            from_lsn,
701        }) = expr("WATCH sessions.user.* FROM LSN 3")
702        else {
703            panic!("bare WATCH should route to FrontendStatement::KvCommand");
704        };
705        assert_eq!(model, CollectionModel::Kv);
706        assert_eq!(collection, "sessions");
707        assert_eq!(key, "user");
708        assert!(prefix);
709        assert_eq!(from_lsn, Some(3));
710
711        let QueryExpr::KvCommand(KvCommand::Watch {
712            model,
713            collection,
714            key,
715            prefix,
716            from_lsn,
717        }) = expr("WATCH VAULT secrets PREFIX api FROM LSN 7")
718        else {
719            panic!("WATCH VAULT should route to FrontendStatement::KvCommand");
720        };
721        assert_eq!(model, CollectionModel::Vault);
722        assert_eq!(collection, "secrets");
723        assert_eq!(key, "api");
724        assert!(prefix);
725        assert_eq!(from_lsn, Some(7));
726
727        let QueryExpr::KvCommand(KvCommand::List {
728            model,
729            collection,
730            prefix,
731            limit,
732            offset,
733            as_json,
734        }) = expr("LIST VAULT secrets PREFIX api LIMIT 10 OFFSET 2")
735        else {
736            panic!("LIST VAULT should route to FrontendStatement::KvCommand");
737        };
738        assert_eq!(model, CollectionModel::Vault);
739        assert_eq!(collection, "secrets");
740        assert_eq!(prefix.as_deref(), Some("api"));
741        assert_eq!(limit, Some(10));
742        assert_eq!(offset, 2);
743        assert!(!as_json);
744
745        assert!(matches!(
746            expr("INVALIDATE CONFIG app feature_flag"),
747            QueryExpr::ConfigCommand(ConfigCommand::InvalidVolatileOperation {
748                operation,
749                collection,
750                key: Some(key),
751            }) if operation == "INVALIDATE" && collection == "app" && key == "feature_flag"
752        ));
753        assert!(matches!(
754            expr("INVALIDATE TAGS [user:42, org:7] FROM sessions"),
755            QueryExpr::KvCommand(KvCommand::InvalidateTags { collection, tags })
756                if collection == "sessions" && tags == vec!["user:42".to_string(), "org:7".to_string()]
757        ));
758
759        let QueryExpr::EventsBackfill(query) =
760            expr("EVENTS BACKFILL users WHERE status = 'active' TO audit LIMIT 10")
761        else {
762            panic!("EVENTS BACKFILL should route to FrontendStatement::EventsBackfill");
763        };
764        assert_eq!(query.collection, "users");
765        assert_eq!(query.where_filter.as_deref(), Some("status = 'active'"));
766        assert_eq!(query.target_queue, "audit");
767        assert_eq!(query.limit, Some(10));
768
769        let QueryExpr::Table(query) = expr("EVENTS STATUS users LIMIT 2") else {
770            panic!("EVENTS STATUS should route through the SQL select surface");
771        };
772        assert_eq!(query.table, "red.subscriptions");
773        assert_eq!(query.limit, Some(2));
774        assert!(query.filter.is_some());
775
776        assert!(matches!(
777            expr("EVENTS BACKFILL STATUS users"),
778            QueryExpr::EventsBackfillStatus { collection } if collection == "users"
779        ));
780        assert!(parse_frontend("LIST UNKNOWN").is_err());
781        assert!(parse_frontend("EVENTS UNKNOWN").is_err());
782    }
783
784    #[test]
785    fn parse_frontend_routes_ranking_reads() {
786        assert!(matches!(
787            expr("RANK OF 42 IN page_rank"),
788            QueryExpr::RankOf(RankOfQuery { ranking, entity_id })
789                if ranking == "page_rank" && entity_id == 42
790        ));
791        assert!(matches!(
792            expr("APPROX RANK OF 7 IN page_rank"),
793            QueryExpr::ApproxRankOf(RankOfQuery { ranking, entity_id })
794                if ranking == "page_rank" && entity_id == 7
795        ));
796        assert!(matches!(
797            expr("RANK RANGE 1 TO 3 IN page_rank"),
798            QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi })
799                if ranking == "page_rank" && lo == 1 && hi == 3
800        ));
801        assert!(matches!(
802            expr("ZRANK page_rank 0"),
803            QueryExpr::RankOf(RankOfQuery { ranking, entity_id })
804                if ranking == "page_rank" && entity_id == 0
805        ));
806        assert!(matches!(
807            expr("ZRANGE page_rank 0 3 WITHSCORES"),
808            QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi })
809                if ranking == "page_rank" && lo == 1 && hi == 4
810        ));
811        assert!(
812            parse_frontend("RANK RANGE 3 TO 1 IN page_rank").is_err(),
813            "rank range must reject reversed bounds"
814        );
815    }
816
817    #[test]
818    fn parse_frontend_covers_multimodel_command_routing() {
819        assert!(matches!(
820            expr("GRAPH CENTRALITY ALGORITHM pagerank LIMIT 5"),
821            QueryExpr::GraphCommand(GraphCommand::Centrality {
822                algorithm,
823                limit: Some(5),
824                ..
825            }) if algorithm == "pagerank"
826        ));
827        assert!(matches!(
828            expr("SEARCH TEXT 'login failure' COLLECTION incidents LIMIT 20 FUZZY"),
829            QueryExpr::SearchCommand(SearchCommand::Text {
830                query,
831                collection: Some(collection),
832                limit: 20,
833                fuzzy: true,
834                ..
835            }) if query == "login failure" && collection == "incidents"
836        ));
837        assert!(matches!(
838            expr("ASK 'why did login fail?' USING openai LIMIT 3"),
839            QueryExpr::Ask(query)
840                if query.question == "why did login fail?"
841                    && query.provider.as_deref() == Some("openai")
842                    && query.limit == Some(3)
843        ));
844        assert!(matches!(
845            expr("QUEUE LEN tasks"),
846            QueryExpr::QueueCommand(QueueCommand::Len { queue }) if queue == "tasks"
847        ));
848        assert!(matches!(
849            expr("TREE REBALANCE forest.org DRY RUN"),
850            QueryExpr::TreeCommand(TreeCommand::Rebalance {
851                collection,
852                tree_name,
853                dry_run: true,
854            }) if collection == "forest" && tree_name == "org"
855        ));
856        assert!(matches!(
857            expr("HLL COUNT visitors"),
858            QueryExpr::ProbabilisticCommand(ProbabilisticCommand::HllCount { names })
859                if names == vec!["visitors".to_string()]
860        ));
861    }
862
863    #[test]
864    fn sql_command_round_trips_multimodel_schema_variants() {
865        macro_rules! assert_command_round_trip {
866            ($input:expr, $pattern:pat) => {{
867                let command = sql_command($input);
868                assert!(matches!(command, $pattern), "unexpected command for {}", $input);
869
870                let statement = sql_command($input).into_statement();
871                let command = statement.into_command();
872                assert!(
873                    matches!(command, $pattern),
874                    "statement round trip changed command for {}",
875                    $input
876                );
877
878                let expr = sql_command($input).into_query_expr();
879                assert!(
880                    !matches!(expr, QueryExpr::Table(TableQuery { table, .. }) if table.is_empty()),
881                    "lowering produced an empty table placeholder for {}",
882                    $input
883                );
884            }};
885        }
886
887        assert_command_round_trip!(
888            "EXPLAIN ALTER FOR CREATE TABLE users (id INT) FORMAT JSON",
889            SqlCommand::ExplainAlter(_)
890        );
891        assert_command_round_trip!("CREATE TABLE users (id INT)", SqlCommand::CreateTable(_));
892        assert_command_round_trip!("DROP TABLE IF EXISTS users", SqlCommand::DropTable(_));
893        assert_command_round_trip!(
894            "ALTER TABLE users ADD COLUMN status TEXT",
895            SqlCommand::AlterTable(_)
896        );
897        assert_command_round_trip!(
898            "CREATE INDEX idx_email ON users (email) USING HASH",
899            SqlCommand::CreateIndex(_)
900        );
901        assert_command_round_trip!(
902            "DROP INDEX IF EXISTS idx_email ON users",
903            SqlCommand::DropIndex(_)
904        );
905        assert_command_round_trip!("CREATE GRAPH identity", SqlCommand::CreateTable(_));
906        assert_command_round_trip!("CREATE DOCUMENT docs", SqlCommand::CreateTable(_));
907        assert_command_round_trip!(
908            "CREATE VECTOR embeddings DIM 4",
909            SqlCommand::CreateVector(_)
910        );
911        assert_command_round_trip!(
912            "CREATE COLLECTION turbo KIND vector.turbo DIM 3",
913            SqlCommand::CreateCollection(_)
914        );
915        assert_command_round_trip!("CREATE KV settings", SqlCommand::CreateTable(_));
916        assert_command_round_trip!("CREATE CONFIG app", SqlCommand::CreateTable(_));
917        assert_command_round_trip!(
918            "CREATE VAULT secrets WITH OWN MASTER KEY",
919            SqlCommand::CreateTable(_)
920        );
921        assert_command_round_trip!(
922            "CREATE TIMESERIES metrics RETENTION 90 d",
923            SqlCommand::CreateTimeSeries(_)
924        );
925        assert_command_round_trip!(
926            "CREATE METRIC svc.latency TYPE gauge ROLE sli",
927            SqlCommand::CreateMetric(_)
928        );
929        assert_command_round_trip!(
930            "ALTER METRIC svc.latency SET ROLE internal",
931            SqlCommand::AlterMetric(_)
932        );
933        assert_command_round_trip!(
934            "CREATE SLO svc.availability ON svc.latency TARGET 99.9 WINDOW 5 m",
935            SqlCommand::CreateSlo(_)
936        );
937        assert_command_round_trip!(
938            "CREATE QUEUE tasks MAX_SIZE 100",
939            SqlCommand::CreateQueue(_)
940        );
941        assert_command_round_trip!(
942            "ALTER QUEUE tasks SET MODE FANOUT",
943            SqlCommand::AlterQueue(_)
944        );
945        assert_command_round_trip!(
946            "CREATE TREE org IN forest ROOT LABEL root MAX_CHILDREN 4",
947            SqlCommand::CreateTree(_)
948        );
949        assert_command_round_trip!(
950            "CREATE HLL visitors PRECISION 14",
951            SqlCommand::Probabilistic(_)
952        );
953        assert_command_round_trip!(
954            "CREATE SKETCH freqs WIDTH 512 DEPTH 3",
955            SqlCommand::Probabilistic(_)
956        );
957        assert_command_round_trip!(
958            "CREATE FILTER seen CAPACITY 1024",
959            SqlCommand::Probabilistic(_)
960        );
961        assert_command_round_trip!("COPY users FROM '/tmp/u.csv'", SqlCommand::CopyFrom(_));
962        assert_command_round_trip!(
963            "CREATE VIEW active_users AS SELECT * FROM users",
964            SqlCommand::CreateView(_)
965        );
966        assert_command_round_trip!("DROP VIEW active_users", SqlCommand::DropView(_));
967        assert_command_round_trip!(
968            "REFRESH MATERIALIZED VIEW active_users",
969            SqlCommand::RefreshMaterializedView(_)
970        );
971        assert_command_round_trip!(
972            "CREATE SERVER mycsv FOREIGN DATA WRAPPER csv OPTIONS (base_path '/data')",
973            SqlCommand::CreateServer(_)
974        );
975        assert_command_round_trip!(
976            "DROP SERVER IF EXISTS mycsv CASCADE",
977            SqlCommand::DropServer(_)
978        );
979        assert_command_round_trip!(
980            "CREATE FOREIGN TABLE ext_users (id INT, name TEXT) SERVER mycsv OPTIONS (path 'users.csv')",
981            SqlCommand::CreateForeignTable(_)
982        );
983        assert_command_round_trip!(
984            "DROP FOREIGN TABLE IF EXISTS ext_users",
985            SqlCommand::DropForeignTable(_)
986        );
987    }
988
989    #[test]
990    fn sql_command_round_trips_drop_truncate_and_maintenance_variants() {
991        macro_rules! assert_command_round_trip {
992            ($input:expr, $pattern:pat) => {{
993                let command = sql_command($input);
994                assert!(
995                    matches!(command, $pattern),
996                    "unexpected command for {}",
997                    $input
998                );
999                let statement = sql_command($input).into_statement();
1000                assert!(
1001                    matches!(statement.into_command(), $pattern),
1002                    "statement round trip changed command for {}",
1003                    $input
1004                );
1005            }};
1006        }
1007
1008        assert_command_round_trip!("DROP GRAPH IF EXISTS identity", SqlCommand::DropGraph(_));
1009        assert_command_round_trip!(
1010            "DROP VECTOR IF EXISTS embeddings",
1011            SqlCommand::DropVector(_)
1012        );
1013        assert_command_round_trip!("DROP DOCUMENT IF EXISTS docs", SqlCommand::DropDocument(_));
1014        assert_command_round_trip!("DROP KV IF EXISTS settings", SqlCommand::DropKv(_));
1015        assert_command_round_trip!("DROP CONFIG IF EXISTS app", SqlCommand::DropKv(_));
1016        assert_command_round_trip!("DROP VAULT IF EXISTS secrets", SqlCommand::DropKv(_));
1017        assert_command_round_trip!(
1018            "DROP COLLECTION IF EXISTS docs",
1019            SqlCommand::DropCollection(_)
1020        );
1021        assert_command_round_trip!(
1022            "DROP TIMESERIES IF EXISTS metrics",
1023            SqlCommand::DropTimeSeries(_)
1024        );
1025        assert_command_round_trip!(
1026            "DROP HYPERTABLE IF EXISTS metrics",
1027            SqlCommand::DropTimeSeries(_)
1028        );
1029        assert_command_round_trip!("DROP QUEUE IF EXISTS tasks", SqlCommand::DropQueue(_));
1030        assert_command_round_trip!("DROP TREE IF EXISTS org IN forest", SqlCommand::DropTree(_));
1031        assert_command_round_trip!("DROP HLL IF EXISTS visitors", SqlCommand::Probabilistic(_));
1032        assert_command_round_trip!("DROP SKETCH IF EXISTS freqs", SqlCommand::Probabilistic(_));
1033        assert_command_round_trip!("DROP FILTER IF EXISTS seen", SqlCommand::Probabilistic(_));
1034        assert_command_round_trip!(
1035            "TRUNCATE VECTOR IF EXISTS embeddings",
1036            SqlCommand::Truncate(_)
1037        );
1038        assert_command_round_trip!(
1039            "COMMIT WORK",
1040            SqlCommand::TransactionControl(TxnControl::Commit)
1041        );
1042        assert_command_round_trip!(
1043            "ROLLBACK",
1044            SqlCommand::TransactionControl(TxnControl::Rollback)
1045        );
1046        assert_command_round_trip!(
1047            "SAVEPOINT before_batch",
1048            SqlCommand::TransactionControl(TxnControl::Savepoint(_))
1049        );
1050        assert_command_round_trip!(
1051            "RELEASE SAVEPOINT before_batch",
1052            SqlCommand::TransactionControl(TxnControl::ReleaseSavepoint(_))
1053        );
1054        assert_command_round_trip!(
1055            "ANALYZE users",
1056            SqlCommand::Maintenance(MaintenanceCommand::Analyze { .. })
1057        );
1058        assert_command_round_trip!(
1059            "VACUUM",
1060            SqlCommand::Maintenance(MaintenanceCommand::Vacuum { .. })
1061        );
1062    }
1063
1064    #[test]
1065    fn parse_sql_command_covers_show_and_error_branches() {
1066        assert!(matches!(
1067            sql_command("SHOW CREATE TABLE public.users"),
1068            SqlCommand::Select(TableQuery { table, .. }) if table == "red.show_create"
1069        ));
1070        assert!(matches!(
1071            sql_command("SHOW COLLECTIONS INCLUDING INTERNAL LIMIT 2"),
1072            SqlCommand::Select(TableQuery { table, limit: Some(2), .. }) if table == "red.collections"
1073        ));
1074        assert!(matches!(
1075            sql_command("SHOW QUEUES INCLUDING INTERNAL"),
1076            SqlCommand::Select(TableQuery { table, filter: None, .. }) if table == "red.queues"
1077        ));
1078        assert!(matches!(
1079            sql_command("SHOW INDICES ON users"),
1080            SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.show_indexes"
1081        ));
1082        assert!(matches!(
1083            sql_command("SHOW POLICIES ON users WHERE action = 'SELECT'"),
1084            SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.policies"
1085        ));
1086        assert!(matches!(
1087            sql_command("SHOW STATS 'users' WHERE rows > 0"),
1088            SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.stats"
1089        ));
1090        assert!(matches!(
1091            sql_command("SHOW SAMPLE users"),
1092            SqlCommand::Select(TableQuery { table, limit: Some(10), .. }) if table == "users"
1093        ));
1094        assert!(matches!(
1095            sql_command("DESC public.users"),
1096            SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.describe"
1097        ));
1098        assert!(
1099            sql_command_result("CREATE VIEW v WITH RETENTION 1 h AS SELECT * FROM users").is_err()
1100        );
1101        assert!(sql_command_result("CREATE TABLE bad WITH ANALYTICS (centrality)").is_err());
1102        assert!(sql_command_result("BEGIN ISOLATION LEVEL SERIALIZABLE").is_err());
1103        assert!(sql_command_result("EVENTS BACKFILL STATUS users").is_err());
1104    }
1105
1106    #[test]
1107    fn parse_sql_command_covers_remaining_catalog_and_copy_shapes() {
1108        for input in [
1109            "SHOW VECTORS",
1110            "SHOW DOCUMENTS",
1111            "SHOW TIMESERIES",
1112            "SHOW GRAPHS",
1113            "SHOW CONFIGS",
1114            "SHOW VAULTS",
1115            "SHOW KV",
1116            "SHOW SCHEMA public.users",
1117        ] {
1118            assert!(
1119                matches!(sql_command(input), SqlCommand::Select(_)),
1120                "{input}"
1121            );
1122        }
1123
1124        for input in [
1125            "TRUNCATE TABLE users",
1126            "TRUNCATE GRAPH identity",
1127            "TRUNCATE DOCUMENT docs",
1128            "TRUNCATE TIMESERIES metrics",
1129            "TRUNCATE METRICS metrics",
1130            "TRUNCATE KV settings",
1131            "TRUNCATE QUEUE tasks",
1132            "TRUNCATE COLLECTION docs",
1133        ] {
1134            assert!(
1135                matches!(sql_command(input), SqlCommand::Truncate(_)),
1136                "{input}"
1137            );
1138        }
1139        assert!(sql_command_result("TRUNCATE UNKNOWN users").is_err());
1140
1141        let SqlCommand::CopyFrom(copy) = sql_command("COPY users FROM '/tmp/u.csv' WITH (HEADER)")
1142        else {
1143            panic!("expected COPY");
1144        };
1145        assert!(copy.has_header);
1146        assert_eq!(copy.delimiter, None);
1147
1148        let SqlCommand::CopyFrom(copy) =
1149            sql_command("COPY users FROM '/tmp/u.csv' WITH (HEADER = false)")
1150        else {
1151            panic!("expected COPY");
1152        };
1153        assert!(!copy.has_header);
1154
1155        let SqlCommand::CopyFrom(copy) =
1156            sql_command("COPY users FROM '/tmp/u.csv' DELIMITER '|' HEADER")
1157        else {
1158            panic!("expected COPY");
1159        };
1160        assert_eq!(copy.delimiter, Some('|'));
1161        assert!(copy.has_header);
1162    }
1163
1164    #[test]
1165    fn parse_sql_command_covers_remaining_ranking_and_event_errors() {
1166        assert!(matches!(
1167            expr("APPROXIMATE RANK OF 9 IN page_rank"),
1168            QueryExpr::ApproxRankOf(RankOfQuery { ranking, entity_id })
1169                if ranking == "page_rank" && entity_id == 9
1170        ));
1171        assert!(parse_frontend("APPROX OF 7 IN page_rank").is_err());
1172        assert!(parse_frontend("RANK 1 IN page_rank").is_err());
1173        assert!(parse_frontend("RANK RANGE 0 TO 3 IN page_rank").is_err());
1174        assert!(parse_frontend("ZRANK page_rank -1").is_err());
1175        assert!(parse_frontend("ZRANGE page_rank 3 1").is_err());
1176
1177        let QueryExpr::Table(query) = expr("EVENTS STATUS 'users' WHERE active = true") else {
1178            panic!("EVENTS STATUS should accept a quoted collection");
1179        };
1180        assert_eq!(query.table, "red.subscriptions");
1181        assert!(query.filter.is_some());
1182        assert!(query.where_expr.is_some());
1183
1184        let QueryExpr::EventsBackfill(query) = expr("EVENTS BACKFILL users TO audit") else {
1185            panic!("EVENTS BACKFILL should allow omitted filter and limit");
1186        };
1187        assert_eq!(query.collection, "users");
1188        assert_eq!(query.where_filter, None);
1189        assert_eq!(query.limit, None);
1190
1191        assert!(parse_frontend("EVENTS BACKFILL users WHERE TO audit").is_err());
1192    }
1193
1194    #[test]
1195    fn parse_sql_command_covers_analytics_non_goal_rejections() {
1196        for head in ["ANALYTICS", "EVENT", "COHORT", "FUNNEL", "SLA", "ADAPTER"] {
1197            let err = sql_command_result(&format!("CREATE {head} demo"))
1198                .expect_err("analytics v0 non-goal should be rejected");
1199            assert!(err.to_string().contains(&format!("CREATE {head}")), "{err}");
1200        }
1201    }
1202
1203    #[test]
1204    fn parse_sql_command_covers_transaction_isolation_edges() {
1205        for input in [
1206            "BEGIN ISOLATION LEVEL READ UNCOMMITTED",
1207            "BEGIN ISOLATION LEVEL READ COMMITTED",
1208            "BEGIN ISOLATION LEVEL REPEATABLE READ",
1209            "START TRANSACTION ISOLATION LEVEL SNAPSHOT",
1210        ] {
1211            assert!(
1212                matches!(
1213                    sql_command(input),
1214                    SqlCommand::TransactionControl(TxnControl::Begin)
1215                ),
1216                "{input}"
1217            );
1218        }
1219
1220        assert!(sql_command_result("BEGIN ISOLATION LEVEL READ").is_err());
1221        assert!(sql_command_result("BEGIN ISOLATION LEVEL REPEATABLE").is_err());
1222        assert!(sql_command_result("BEGIN ISOLATION LEVEL CHAOS").is_err());
1223    }
1224
1225    #[test]
1226    fn parse_sql_command_covers_iam_and_hypertable_dispatch_edges() {
1227        assert!(matches!(
1228            expr("CREATE HYPERTABLE metrics TIME_COLUMN ts CHUNK_INTERVAL '1d'"),
1229            QueryExpr::CreateTimeSeries(query)
1230                if query.name == "metrics" && query.hypertable.is_some()
1231        ));
1232        assert!(sql_command_result("CREATE OR TABLE bad (id INT)").is_err());
1233        assert!(sql_command_result("DROP MATERIALIZED TABLE bad").is_err());
1234
1235        assert!(matches!(
1236            expr("ATTACH POLICY 'readonly' TO USER tenant1.alice"),
1237            QueryExpr::AttachPolicy { policy_id, .. } if policy_id == "readonly"
1238        ));
1239        assert!(matches!(
1240            expr("DETACH POLICY 'readonly' FROM GROUP analysts"),
1241            QueryExpr::DetachPolicy { policy_id, .. } if policy_id == "readonly"
1242        ));
1243        assert!(matches!(
1244            expr("SHOW POLICIES FOR USER alice"),
1245            QueryExpr::ShowPolicies { filter: Some(_) }
1246        ));
1247        assert!(matches!(
1248            expr("SHOW EFFECTIVE PERMISSIONS FOR alice"),
1249            QueryExpr::ShowEffectivePermissions { resource: None, .. }
1250        ));
1251        assert!(matches!(
1252            expr("SIMULATE alice ACTION 'iam:PassRole' ON TABLE:public.orders"),
1253            QueryExpr::SimulatePolicy { action, .. } if action == "iam:PassRole"
1254        ));
1255        assert!(matches!(
1256            expr("LINT POLICY JSON '{\"Statement\":[]}'"),
1257            QueryExpr::LintPolicy { .. }
1258        ));
1259        assert!(matches!(
1260            expr("MIGRATE POLICY MODE TO 'policy_only' DRY RUN"),
1261            QueryExpr::MigratePolicyMode {
1262                target,
1263                dry_run: true,
1264            } if target == "policy_only"
1265        ));
1266        assert!(parse_frontend("MIGRATE OTHER").is_err());
1267    }
1268
1269    #[test]
1270    fn parse_frontend_rejects_trailing_tokens() {
1271        let err = parse_frontend("SET TENANT 'acme' junk")
1272            .expect_err("parse_frontend should reject trailing tokens");
1273        assert!(
1274            err.to_string().contains("Unexpected token after query"),
1275            "{err}"
1276        );
1277    }
1278}
1279
1280fn add_table_filter(query: &mut TableQuery, filter: Filter) {
1281    let combined = match query.filter.take() {
1282        Some(existing) => existing.and(filter),
1283        None => filter,
1284    };
1285    query.where_expr = Some(filter_to_expr(&combined));
1286    query.filter = Some(combined);
1287}
1288
1289fn parse_show_collections_by_model(
1290    parser: &mut Parser<'_>,
1291    model: &str,
1292) -> Result<TableQuery, ParseError> {
1293    let mut query = TableQuery::new("red.collections");
1294    parser.parse_table_clauses(&mut query)?;
1295    add_table_filter(&mut query, collection_model_filter(model));
1296    Ok(query)
1297}
1298
1299#[derive(Debug, Clone)]
1300#[allow(clippy::large_enum_variant)]
1301pub enum SqlQuery {
1302    Select(TableQuery),
1303    Join(JoinQuery),
1304}
1305
1306#[derive(Debug, Clone)]
1307pub enum SqlMutation {
1308    Insert(InsertQuery),
1309    Update(UpdateQuery),
1310    Delete(DeleteQuery),
1311}
1312
1313#[derive(Debug, Clone)]
1314pub enum SqlSchemaCommand {
1315    ExplainAlter(ExplainAlterQuery),
1316    CreateTable(CreateTableQuery),
1317    CreateCollection(CreateCollectionQuery),
1318    CreateVector(CreateVectorQuery),
1319    DropTable(DropTableQuery),
1320    DropGraph(DropGraphQuery),
1321    DropVector(DropVectorQuery),
1322    DropDocument(DropDocumentQuery),
1323    DropKv(DropKvQuery),
1324    DropCollection(DropCollectionQuery),
1325    Truncate(TruncateQuery),
1326    AlterTable(AlterTableQuery),
1327    CreateIndex(CreateIndexQuery),
1328    DropIndex(DropIndexQuery),
1329    CreateTimeSeries(CreateTimeSeriesQuery),
1330    CreateMetric(CreateMetricQuery),
1331    AlterMetric(AlterMetricQuery),
1332    CreateSlo(CreateSloQuery),
1333    DropTimeSeries(DropTimeSeriesQuery),
1334    CreateQueue(CreateQueueQuery),
1335    AlterQueue(AlterQueueQuery),
1336    DropQueue(DropQueueQuery),
1337    CreateTree(CreateTreeQuery),
1338    DropTree(DropTreeQuery),
1339    Probabilistic(ProbabilisticCommand),
1340    CreateSchema(CreateSchemaQuery),
1341    DropSchema(DropSchemaQuery),
1342    CreateSequence(CreateSequenceQuery),
1343    DropSequence(DropSequenceQuery),
1344    CopyFrom(CopyFromQuery),
1345    CreateView(CreateViewQuery),
1346    DropView(DropViewQuery),
1347    RefreshMaterializedView(RefreshMaterializedViewQuery),
1348    CreatePolicy(CreatePolicyQuery),
1349    DropPolicy(DropPolicyQuery),
1350    CreateServer(CreateServerQuery),
1351    DropServer(DropServerQuery),
1352    CreateForeignTable(CreateForeignTableQuery),
1353    DropForeignTable(DropForeignTableQuery),
1354    CreateMigration(CreateMigrationQuery),
1355    ApplyMigration(ApplyMigrationQuery),
1356    RollbackMigration(RollbackMigrationQuery),
1357    ExplainMigration(ExplainMigrationQuery),
1358}
1359
1360#[derive(Debug, Clone)]
1361#[allow(clippy::large_enum_variant)]
1362pub enum SqlAdminCommand {
1363    SetConfig {
1364        key: String,
1365        value: Value,
1366    },
1367    ShowConfig {
1368        prefix: Option<String>,
1369        as_json: bool,
1370    },
1371    SetSecret {
1372        key: String,
1373        value: Value,
1374    },
1375    DeleteSecret {
1376        key: String,
1377    },
1378    ShowSecrets {
1379        prefix: Option<String>,
1380    },
1381    SetTenant(Option<String>),
1382    ShowTenant,
1383    TransactionControl(TxnControl),
1384    Maintenance(MaintenanceCommand),
1385    Grant(GrantStmt),
1386    Revoke(RevokeStmt),
1387    AlterUser(AlterUserStmt),
1388    CreateUser(CreateUserStmt),
1389    IamPolicy(QueryExpr),
1390}
1391
1392impl SqlStatement {
1393    pub fn into_command(self) -> SqlCommand {
1394        match self {
1395            SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
1396            SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
1397            SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
1398            SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
1399            SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
1400            SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
1401                SqlCommand::ExplainAlter(query)
1402            }
1403            SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
1404                SqlCommand::CreateTable(query)
1405            }
1406            SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
1407                SqlCommand::CreateCollection(query)
1408            }
1409            SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
1410                SqlCommand::CreateVector(query)
1411            }
1412            SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
1413                SqlCommand::DropTable(query)
1414            }
1415            SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
1416                SqlCommand::DropGraph(query)
1417            }
1418            SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
1419                SqlCommand::DropVector(query)
1420            }
1421            SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
1422                SqlCommand::DropDocument(query)
1423            }
1424            SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
1425            SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
1426                SqlCommand::DropCollection(query)
1427            }
1428            SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
1429            SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
1430                SqlCommand::AlterTable(query)
1431            }
1432            SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
1433                SqlCommand::CreateIndex(query)
1434            }
1435            SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
1436                SqlCommand::DropIndex(query)
1437            }
1438            SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
1439                SqlCommand::CreateTimeSeries(query)
1440            }
1441            SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query)) => {
1442                SqlCommand::CreateMetric(query)
1443            }
1444            SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query)) => {
1445                SqlCommand::AlterMetric(query)
1446            }
1447            SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query)) => {
1448                SqlCommand::CreateSlo(query)
1449            }
1450            SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
1451                SqlCommand::DropTimeSeries(query)
1452            }
1453            SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
1454                SqlCommand::CreateQueue(query)
1455            }
1456            SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
1457                SqlCommand::AlterQueue(query)
1458            }
1459            SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
1460                SqlCommand::DropQueue(query)
1461            }
1462            SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
1463                SqlCommand::CreateTree(query)
1464            }
1465            SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
1466            SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
1467                SqlCommand::Probabilistic(command)
1468            }
1469            SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
1470                SqlCommand::SetConfig { key, value }
1471            }
1472            SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix, as_json }) => {
1473                SqlCommand::ShowConfig { prefix, as_json }
1474            }
1475            SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
1476                SqlCommand::SetSecret { key, value }
1477            }
1478            SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
1479                SqlCommand::DeleteSecret { key }
1480            }
1481            SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
1482                SqlCommand::ShowSecrets { prefix }
1483            }
1484            SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
1485            SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
1486            SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
1487                SqlCommand::TransactionControl(ctl)
1488            }
1489            SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
1490            SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
1491            SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
1492            SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
1493                SqlCommand::CreateSequence(q)
1494            }
1495            SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
1496            SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
1497            SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
1498            SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
1499            SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
1500                SqlCommand::RefreshMaterializedView(q)
1501            }
1502            SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
1503            SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
1504            SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
1505            SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
1506            SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
1507                SqlCommand::CreateForeignTable(q)
1508            }
1509            SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
1510                SqlCommand::DropForeignTable(q)
1511            }
1512            SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
1513            SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
1514            SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
1515            SqlStatement::Admin(SqlAdminCommand::CreateUser(s)) => SqlCommand::CreateUser(s),
1516            SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
1517            SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
1518                SqlCommand::CreateMigration(q)
1519            }
1520            SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
1521                SqlCommand::ApplyMigration(q)
1522            }
1523            SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
1524                SqlCommand::RollbackMigration(q)
1525            }
1526            SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
1527                SqlCommand::ExplainMigration(q)
1528            }
1529        }
1530    }
1531
1532    pub fn into_query_expr(self) -> QueryExpr {
1533        self.into_command().into_query_expr()
1534    }
1535}
1536
1537impl FrontendStatement {
1538    pub fn into_query_expr(self) -> QueryExpr {
1539        match self {
1540            FrontendStatement::Sql(statement) => statement.into_query_expr(),
1541            FrontendStatement::Graph(query) => QueryExpr::Graph(query),
1542            FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
1543            FrontendStatement::Path(query) => QueryExpr::Path(query),
1544            FrontendStatement::Vector(query) => QueryExpr::Vector(query),
1545            FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
1546            FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
1547            FrontendStatement::Ask(query) => QueryExpr::Ask(query),
1548            FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
1549            FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
1550            FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
1551            FrontendStatement::EventsBackfillStatus { collection } => {
1552                QueryExpr::EventsBackfillStatus { collection }
1553            }
1554            FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
1555            FrontendStatement::ProbabilisticCommand(command) => {
1556                QueryExpr::ProbabilisticCommand(command)
1557            }
1558            FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
1559            FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
1560            FrontendStatement::Ranking(expr) => expr,
1561        }
1562    }
1563}
1564
1565pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
1566    let mut parser = Parser::new(input)?;
1567    let statement = parser.parse_frontend_statement()?;
1568    if !parser.check(&Token::Eof) {
1569        return Err(ParseError::new(
1570            // F-05: `Token::Ident` / `Token::String` / `Token::JsonLiteral`
1571            // Display arms emit raw user bytes. Render via `{:?}` so
1572            // embedded CR/LF/NUL/quotes are escaped before the message
1573            // reaches downstream JSON / audit / log / gRPC sinks.
1574            format!("Unexpected token after query: {:?}", parser.current.token),
1575            parser.position(),
1576        ));
1577    }
1578    Ok(statement)
1579}
1580
1581impl SqlCommand {
1582    pub fn into_query_expr(self) -> QueryExpr {
1583        match self {
1584            SqlCommand::Select(query) => QueryExpr::Table(query),
1585            SqlCommand::Join(query) => QueryExpr::Join(query),
1586            SqlCommand::Insert(query) => QueryExpr::Insert(query),
1587            SqlCommand::Update(query) => QueryExpr::Update(query),
1588            SqlCommand::Delete(query) => QueryExpr::Delete(query),
1589            SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
1590            SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
1591            SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
1592            SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
1593            SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
1594            SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
1595            SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
1596            SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
1597            SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
1598            SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
1599            SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
1600            SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
1601            SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
1602            SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
1603            SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
1604            SqlCommand::CreateMetric(query) => QueryExpr::CreateMetric(query),
1605            SqlCommand::AlterMetric(query) => QueryExpr::AlterMetric(query),
1606            SqlCommand::CreateSlo(query) => QueryExpr::CreateSlo(query),
1607            SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
1608            SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
1609            SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
1610            SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
1611            SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
1612            SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
1613            SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
1614            SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
1615            SqlCommand::ShowConfig { prefix, as_json } => QueryExpr::ShowConfig { prefix, as_json },
1616            SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
1617            SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
1618            SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
1619            SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
1620            SqlCommand::ShowTenant => QueryExpr::ShowTenant,
1621            SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
1622            SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
1623            SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
1624            SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
1625            SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
1626            SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
1627            SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
1628            SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
1629            SqlCommand::DropView(q) => QueryExpr::DropView(q),
1630            SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
1631            SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
1632            SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
1633            SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
1634            SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
1635            SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
1636            SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
1637            SqlCommand::Grant(s) => QueryExpr::Grant(s),
1638            SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
1639            SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
1640            SqlCommand::CreateUser(s) => QueryExpr::CreateUser(s),
1641            SqlCommand::IamPolicy(e) => e,
1642            SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
1643            SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
1644            SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
1645            SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
1646        }
1647    }
1648
1649    pub fn into_statement(self) -> SqlStatement {
1650        match self {
1651            SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
1652            SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
1653            SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
1654            SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
1655            SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
1656            SqlCommand::ExplainAlter(query) => {
1657                SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
1658            }
1659            SqlCommand::CreateTable(query) => {
1660                SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
1661            }
1662            SqlCommand::CreateCollection(query) => {
1663                SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
1664            }
1665            SqlCommand::CreateVector(query) => {
1666                SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
1667            }
1668            SqlCommand::DropTable(query) => {
1669                SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
1670            }
1671            SqlCommand::DropGraph(query) => {
1672                SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
1673            }
1674            SqlCommand::DropVector(query) => {
1675                SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
1676            }
1677            SqlCommand::DropDocument(query) => {
1678                SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
1679            }
1680            SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
1681            SqlCommand::DropCollection(query) => {
1682                SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
1683            }
1684            SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
1685            SqlCommand::AlterTable(query) => {
1686                SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
1687            }
1688            SqlCommand::CreateIndex(query) => {
1689                SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
1690            }
1691            SqlCommand::DropIndex(query) => {
1692                SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
1693            }
1694            SqlCommand::CreateTimeSeries(query) => {
1695                SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
1696            }
1697            SqlCommand::CreateMetric(query) => {
1698                SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query))
1699            }
1700            SqlCommand::AlterMetric(query) => {
1701                SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query))
1702            }
1703            SqlCommand::CreateSlo(query) => {
1704                SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query))
1705            }
1706            SqlCommand::DropTimeSeries(query) => {
1707                SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
1708            }
1709            SqlCommand::CreateQueue(query) => {
1710                SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
1711            }
1712            SqlCommand::AlterQueue(query) => {
1713                SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
1714            }
1715            SqlCommand::DropQueue(query) => {
1716                SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
1717            }
1718            SqlCommand::CreateTree(query) => {
1719                SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
1720            }
1721            SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
1722            SqlCommand::Probabilistic(command) => {
1723                SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
1724            }
1725            SqlCommand::SetConfig { key, value } => {
1726                SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
1727            }
1728            SqlCommand::ShowConfig { prefix, as_json } => {
1729                SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix, as_json })
1730            }
1731            SqlCommand::SetSecret { key, value } => {
1732                SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
1733            }
1734            SqlCommand::DeleteSecret { key } => {
1735                SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
1736            }
1737            SqlCommand::ShowSecrets { prefix } => {
1738                SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
1739            }
1740            SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
1741            SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
1742            SqlCommand::TransactionControl(ctl) => {
1743                SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
1744            }
1745            SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
1746            SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
1747            SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
1748            SqlCommand::CreateSequence(q) => {
1749                SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
1750            }
1751            SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
1752            SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
1753            SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
1754            SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
1755            SqlCommand::RefreshMaterializedView(q) => {
1756                SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
1757            }
1758            SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
1759            SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
1760            SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
1761            SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
1762            SqlCommand::CreateForeignTable(q) => {
1763                SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
1764            }
1765            SqlCommand::DropForeignTable(q) => {
1766                SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
1767            }
1768            SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
1769            SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
1770            SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
1771            SqlCommand::CreateUser(s) => SqlStatement::Admin(SqlAdminCommand::CreateUser(s)),
1772            SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
1773            SqlCommand::CreateMigration(q) => {
1774                SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
1775            }
1776            SqlCommand::ApplyMigration(q) => {
1777                SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
1778            }
1779            SqlCommand::RollbackMigration(q) => {
1780                SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
1781            }
1782            SqlCommand::ExplainMigration(q) => {
1783                SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
1784            }
1785        }
1786    }
1787}
1788
1789impl<'a> Parser<'a> {
1790    fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
1791        self.expect_ident()?; // EVENTS
1792        if self.consume_ident_ci("STATUS")? {
1793            let mut query = TableQuery::new("red.subscriptions");
1794            let collection = match self.peek().clone() {
1795                Token::Ident(name) => {
1796                    self.advance()?;
1797                    Some(name)
1798                }
1799                Token::String(name) => {
1800                    self.advance()?;
1801                    Some(name)
1802                }
1803                _ => None,
1804            };
1805            self.parse_table_clauses(&mut query)?;
1806            if let Some(collection) = collection {
1807                let filter = Filter::compare(
1808                    FieldRef::column("red.subscriptions", "collection"),
1809                    CompareOp::Eq,
1810                    Value::text(collection),
1811                );
1812                let expr = filter_to_expr(&filter);
1813                query.where_expr = Some(match query.where_expr.take() {
1814                    Some(existing) => Expr::binop(BinOp::And, existing, expr),
1815                    None => expr,
1816                });
1817                query.filter = Some(match query.filter.take() {
1818                    Some(existing) => existing.and(filter),
1819                    None => filter,
1820                });
1821            }
1822            return Ok(QueryExpr::Table(query));
1823        }
1824
1825        if !self.consume_ident_ci("BACKFILL")? {
1826            return Err(ParseError::expected(
1827                vec!["BACKFILL", "STATUS"],
1828                self.peek(),
1829                self.position(),
1830            ));
1831        }
1832
1833        if self.consume_ident_ci("STATUS")? {
1834            let collection = self.expect_ident()?;
1835            return Ok(QueryExpr::EventsBackfillStatus { collection });
1836        }
1837
1838        let collection = self.expect_ident()?;
1839        let where_filter = if self.consume(&Token::Where)? {
1840            let mut parts = Vec::new();
1841            while !self.check(&Token::Eof) && !self.check(&Token::To) {
1842                parts.push(self.peek().to_string());
1843                self.advance()?;
1844            }
1845            if parts.is_empty() {
1846                return Err(ParseError::expected(
1847                    vec!["predicate"],
1848                    self.peek(),
1849                    self.position(),
1850                ));
1851            }
1852            Some(parts.join(" "))
1853        } else {
1854            None
1855        };
1856
1857        self.expect(Token::To)?;
1858        let target_queue = self.expect_ident()?;
1859        let limit = if self.consume(&Token::Limit)? {
1860            Some(self.parse_positive_integer("LIMIT")? as u64)
1861        } else {
1862            None
1863        };
1864
1865        Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
1866            collection,
1867            where_filter,
1868            target_queue,
1869            limit,
1870        }))
1871    }
1872
1873    /// Parse an optional `OPTIONS (key 'value', key2 'value2', ...)` clause
1874    /// used by Phase 3.2 FDW DDL statements. Returns an empty vec when the
1875    /// clause is absent. Values are always single-quoted string literals —
1876    /// consistent with PG's generic-options model.
1877    pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
1878        if !self.consume(&Token::Options)? {
1879            return Ok(Vec::new());
1880        }
1881        self.expect(Token::LParen)?;
1882        let mut out: Vec<(String, String)> = Vec::new();
1883        loop {
1884            // Option keys frequently collide with reserved words
1885            // (`path`, `format`, `delimiter`, `header`, …) — accept
1886            // the keyword form and lowercase it so downstream
1887            // option-name matching stays case-insensitive.
1888            let was_ident = matches!(self.peek(), Token::Ident(_));
1889            let raw = self.expect_ident_or_keyword()?;
1890            let key = if was_ident {
1891                raw
1892            } else {
1893                raw.to_ascii_lowercase()
1894            };
1895            // Value is a single-quoted string literal.
1896            let value = self.parse_string()?;
1897            out.push((key, value));
1898            if !self.consume(&Token::Comma)? {
1899                break;
1900            }
1901        }
1902        self.expect(Token::RParen)?;
1903        Ok(out)
1904    }
1905
1906    /// Parse any top-level frontend statement through a single shared surface.
1907    pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
1908        match self.peek() {
1909            Token::Select => match self.parse_select_query()? {
1910                QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1911                    SqlQuery::Select(query),
1912                ))),
1913                QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1914                    SqlQuery::Join(query),
1915                ))),
1916                QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
1917                other => Err(ParseError::new(
1918                    format!("internal: SELECT produced unexpected query kind {other:?}"),
1919                    self.position(),
1920                )),
1921            },
1922            Token::From
1923            | Token::Insert
1924            | Token::Update
1925            | Token::Truncate
1926            | Token::Create
1927            | Token::Drop
1928            | Token::Alter
1929            | Token::Set
1930            | Token::Begin
1931            | Token::Commit
1932            | Token::Rollback
1933            | Token::Savepoint
1934            | Token::Release
1935            | Token::Start
1936            | Token::Vacuum
1937            | Token::Analyze
1938            | Token::Copy
1939            | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
1940            Token::Explain => {
1941                if matches!(
1942                    self.peek_next()?,
1943                    Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
1944                ) {
1945                    match self.parse_explain_ask_query()? {
1946                        QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
1947                        other => Err(ParseError::new(
1948                            format!(
1949                                "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
1950                            ),
1951                            self.position(),
1952                        )),
1953                    }
1954                } else {
1955                    self.parse_sql_statement().map(FrontendStatement::Sql)
1956                }
1957            }
1958            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
1959                self.parse_sql_statement().map(FrontendStatement::Sql)
1960            }
1961            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
1962                self.parse_sql_statement().map(FrontendStatement::Sql)
1963            }
1964            Token::Ident(name)
1965                if name.eq_ignore_ascii_case("RANK")
1966                    || name.eq_ignore_ascii_case("APPROX")
1967                    || name.eq_ignore_ascii_case("APPROXIMATE")
1968                    || name.eq_ignore_ascii_case("ZRANK")
1969                    || name.eq_ignore_ascii_case("ZRANGE") =>
1970            {
1971                self.parse_ranking_read().map(FrontendStatement::Ranking)
1972            }
1973            Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
1974            Token::Ident(name)
1975                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
1976            {
1977                self.parse_sql_statement().map(FrontendStatement::Sql)
1978            }
1979            Token::Ident(name)
1980                if name.eq_ignore_ascii_case("GRANT")
1981                    || name.eq_ignore_ascii_case("REVOKE")
1982                    || name.eq_ignore_ascii_case("SIMULATE")
1983                    || name.eq_ignore_ascii_case("LINT")
1984                    || name.eq_ignore_ascii_case("MIGRATE")
1985                    || name.eq_ignore_ascii_case("APPLY") =>
1986            {
1987                self.parse_sql_statement().map(FrontendStatement::Sql)
1988            }
1989            Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
1990                self.advance()?;
1991                if matches!(
1992                    self.peek(),
1993                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1994                ) {
1995                    match self.parse_config_watch_after_watch()? {
1996                        QueryExpr::ConfigCommand(command) => {
1997                            Ok(FrontendStatement::ConfigCommand(command))
1998                        }
1999                        other => Err(ParseError::new(
2000                            format!(
2001                                "internal: WATCH CONFIG produced unexpected query kind {other:?}"
2002                            ),
2003                            self.position(),
2004                        )),
2005                    }
2006                } else if matches!(
2007                    self.peek(),
2008                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
2009                ) {
2010                    match self.parse_vault_watch_after_watch()? {
2011                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2012                        other => Err(ParseError::new(
2013                            format!(
2014                                "internal: WATCH VAULT produced unexpected query kind {other:?}"
2015                            ),
2016                            self.position(),
2017                        )),
2018                    }
2019                } else {
2020                    match self.parse_kv_watch(reddb_types::catalog::CollectionModel::Kv)? {
2021                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2022                        other => Err(ParseError::new(
2023                            format!("internal: WATCH produced unexpected query kind {other:?}"),
2024                            self.position(),
2025                        )),
2026                    }
2027                }
2028            }
2029            Token::List => {
2030                self.advance()?;
2031                if matches!(
2032                    self.peek(),
2033                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
2034                ) {
2035                    match self.parse_config_list_after_list()? {
2036                        QueryExpr::ConfigCommand(command) => {
2037                            Ok(FrontendStatement::ConfigCommand(command))
2038                        }
2039                        other => Err(ParseError::new(
2040                            format!(
2041                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
2042                            ),
2043                            self.position(),
2044                        )),
2045                    }
2046                } else if matches!(self.peek(), Token::Kv) {
2047                    match self.parse_kv_list_after_list()? {
2048                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2049                        other => Err(ParseError::new(
2050                            format!("internal: LIST KV produced unexpected query kind {other:?}"),
2051                            self.position(),
2052                        )),
2053                    }
2054                } else if matches!(
2055                    self.peek(),
2056                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
2057                ) {
2058                    match self.parse_vault_list_after_list()? {
2059                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2060                        other => Err(ParseError::new(
2061                            format!(
2062                                "internal: LIST VAULT produced unexpected query kind {other:?}"
2063                            ),
2064                            self.position(),
2065                        )),
2066                    }
2067                } else {
2068                    Err(ParseError::expected(
2069                        vec!["CONFIG", "KV", "VAULT"],
2070                        self.peek(),
2071                        self.position(),
2072                    ))
2073                }
2074            }
2075            Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
2076                self.advance()?;
2077                if matches!(
2078                    self.peek(),
2079                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
2080                ) {
2081                    match self.parse_config_list_after_list()? {
2082                        QueryExpr::ConfigCommand(command) => {
2083                            Ok(FrontendStatement::ConfigCommand(command))
2084                        }
2085                        other => Err(ParseError::new(
2086                            format!(
2087                                "internal: LIST CONFIG produced unexpected query kind {other:?}"
2088                            ),
2089                            self.position(),
2090                        )),
2091                    }
2092                } else if matches!(self.peek(), Token::Kv) {
2093                    match self.parse_kv_list_after_list()? {
2094                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2095                        other => Err(ParseError::new(
2096                            format!("internal: LIST KV produced unexpected query kind {other:?}"),
2097                            self.position(),
2098                        )),
2099                    }
2100                } else if matches!(
2101                    self.peek(),
2102                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
2103                ) {
2104                    match self.parse_vault_list_after_list()? {
2105                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2106                        other => Err(ParseError::new(
2107                            format!(
2108                                "internal: LIST VAULT produced unexpected query kind {other:?}"
2109                            ),
2110                            self.position(),
2111                        )),
2112                    }
2113                } else {
2114                    Err(ParseError::expected(
2115                        vec!["CONFIG", "KV", "VAULT"],
2116                        self.peek(),
2117                        self.position(),
2118                    ))
2119                }
2120            }
2121            Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
2122                if matches!(
2123                    self.peek_next()?,
2124                    Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
2125                ) {
2126                    match self.parse_config_command()? {
2127                        QueryExpr::ConfigCommand(command) => {
2128                            Ok(FrontendStatement::ConfigCommand(command))
2129                        }
2130                        other => Err(ParseError::new(
2131                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
2132                            self.position(),
2133                        )),
2134                    }
2135                } else {
2136                    self.advance()?;
2137                    match self.parse_kv_invalidate_tags_after_invalidate()? {
2138                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2139                        other => Err(ParseError::new(
2140                            format!(
2141                                "internal: INVALIDATE produced unexpected query kind {other:?}"
2142                            ),
2143                            self.position(),
2144                        )),
2145                    }
2146                }
2147            }
2148            Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
2149            Token::Match => match self.parse_match_query()? {
2150                QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
2151                other => Err(ParseError::new(
2152                    format!("internal: MATCH produced unexpected query kind {other:?}"),
2153                    self.position(),
2154                )),
2155            },
2156            Token::Path => match self.parse_path_query()? {
2157                QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
2158                other => Err(ParseError::new(
2159                    format!("internal: PATH produced unexpected query kind {other:?}"),
2160                    self.position(),
2161                )),
2162            },
2163            Token::Vector => match self.parse_vector_query()? {
2164                QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
2165                other => Err(ParseError::new(
2166                    format!("internal: VECTOR produced unexpected query kind {other:?}"),
2167                    self.position(),
2168                )),
2169            },
2170            Token::Hybrid => match self.parse_hybrid_query()? {
2171                QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
2172                other => Err(ParseError::new(
2173                    format!("internal: HYBRID produced unexpected query kind {other:?}"),
2174                    self.position(),
2175                )),
2176            },
2177            Token::Graph => match self.parse_graph_command()? {
2178                QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
2179                other => Err(ParseError::new(
2180                    format!("internal: GRAPH produced unexpected query kind {other:?}"),
2181                    self.position(),
2182                )),
2183            },
2184            Token::Search => match self.parse_search_command()? {
2185                QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
2186                other => Err(ParseError::new(
2187                    format!("internal: SEARCH produced unexpected query kind {other:?}"),
2188                    self.position(),
2189                )),
2190            },
2191            Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
2192                match self.parse_ask_query()? {
2193                    QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
2194                    other => Err(ParseError::new(
2195                        format!("internal: ASK produced unexpected query kind {other:?}"),
2196                        self.position(),
2197                    )),
2198                }
2199            }
2200            Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
2201                match self.parse_unseal_vault_command()? {
2202                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2203                    other => Err(ParseError::new(
2204                        format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
2205                        self.position(),
2206                    )),
2207                }
2208            }
2209            Token::Queue => match self.parse_queue_command()? {
2210                QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
2211                other => Err(ParseError::new(
2212                    format!("internal: QUEUE produced unexpected query kind {other:?}"),
2213                    self.position(),
2214                )),
2215            },
2216            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2217                match self.parse_events_command()? {
2218                    QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
2219                        SqlQuery::Select(query),
2220                    ))),
2221                    QueryExpr::EventsBackfill(query) => {
2222                        Ok(FrontendStatement::EventsBackfill(query))
2223                    }
2224                    QueryExpr::EventsBackfillStatus { collection } => {
2225                        Ok(FrontendStatement::EventsBackfillStatus { collection })
2226                    }
2227                    other => Err(ParseError::new(
2228                        format!("internal: EVENTS produced unexpected query kind {other:?}"),
2229                        self.position(),
2230                    )),
2231                }
2232            }
2233            Token::Kv => match self.parse_kv_command()? {
2234                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2235                other => Err(ParseError::new(
2236                    format!("internal: KV produced unexpected query kind {other:?}"),
2237                    self.position(),
2238                )),
2239            },
2240            Token::Delete => {
2241                if matches!(
2242                    self.peek_next()?,
2243                    Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
2244                ) {
2245                    match self.parse_config_command()? {
2246                        QueryExpr::ConfigCommand(command) => {
2247                            Ok(FrontendStatement::ConfigCommand(command))
2248                        }
2249                        other => Err(ParseError::new(
2250                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
2251                            self.position(),
2252                        )),
2253                    }
2254                } else if matches!(
2255                    self.peek_next()?,
2256                    Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
2257                ) {
2258                    match self.parse_vault_lifecycle_command()? {
2259                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2260                        other => Err(ParseError::new(
2261                            format!("internal: VAULT produced unexpected query kind {other:?}"),
2262                            self.position(),
2263                        )),
2264                    }
2265                } else {
2266                    self.parse_sql_statement().map(FrontendStatement::Sql)
2267                }
2268            }
2269            Token::Add => match self.parse_config_command()? {
2270                QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
2271                other => Err(ParseError::new(
2272                    format!("internal: CONFIG produced unexpected query kind {other:?}"),
2273                    self.position(),
2274                )),
2275            },
2276            Token::Purge => match self.parse_vault_lifecycle_command()? {
2277                QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2278                other => Err(ParseError::new(
2279                    format!("internal: VAULT produced unexpected query kind {other:?}"),
2280                    self.position(),
2281                )),
2282            },
2283            Token::Ident(name)
2284                if name.eq_ignore_ascii_case("PUT")
2285                    || name.eq_ignore_ascii_case("GET")
2286                    || name.eq_ignore_ascii_case("RESOLVE")
2287                    || name.eq_ignore_ascii_case("ROTATE")
2288                    || name.eq_ignore_ascii_case("HISTORY")
2289                    || name.eq_ignore_ascii_case("PURGE")
2290                    || name.eq_ignore_ascii_case("INCR")
2291                    || name.eq_ignore_ascii_case("DECR")
2292                    || name.eq_ignore_ascii_case("INVALIDATE") =>
2293            {
2294                if matches!(
2295                    self.peek_next()?,
2296                    Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
2297                ) {
2298                    match self.parse_vault_lifecycle_command()? {
2299                        QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2300                        other => Err(ParseError::new(
2301                            format!("internal: VAULT produced unexpected query kind {other:?}"),
2302                            self.position(),
2303                        )),
2304                    }
2305                } else {
2306                    match self.parse_config_command()? {
2307                        QueryExpr::ConfigCommand(command) => {
2308                            Ok(FrontendStatement::ConfigCommand(command))
2309                        }
2310                        other => Err(ParseError::new(
2311                            format!("internal: CONFIG produced unexpected query kind {other:?}"),
2312                            self.position(),
2313                        )),
2314                    }
2315                }
2316            }
2317            Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
2318                match self.parse_vault_command()? {
2319                    QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2320                    other => Err(ParseError::new(
2321                        format!("internal: VAULT produced unexpected query kind {other:?}"),
2322                        self.position(),
2323                    )),
2324                }
2325            }
2326            Token::Tree => match self.parse_tree_command()? {
2327                QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
2328                other => Err(ParseError::new(
2329                    format!("internal: TREE produced unexpected query kind {other:?}"),
2330                    self.position(),
2331                )),
2332            },
2333            Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
2334                match self.parse_hll_command()? {
2335                    QueryExpr::ProbabilisticCommand(command) => {
2336                        Ok(FrontendStatement::ProbabilisticCommand(command))
2337                    }
2338                    other => Err(ParseError::new(
2339                        format!("internal: HLL produced unexpected query kind {other:?}"),
2340                        self.position(),
2341                    )),
2342                }
2343            }
2344            Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
2345                match self.parse_sketch_command()? {
2346                    QueryExpr::ProbabilisticCommand(command) => {
2347                        Ok(FrontendStatement::ProbabilisticCommand(command))
2348                    }
2349                    other => Err(ParseError::new(
2350                        format!("internal: SKETCH produced unexpected query kind {other:?}"),
2351                        self.position(),
2352                    )),
2353                }
2354            }
2355            Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
2356                match self.parse_filter_command()? {
2357                    QueryExpr::ProbabilisticCommand(command) => {
2358                        Ok(FrontendStatement::ProbabilisticCommand(command))
2359                    }
2360                    other => Err(ParseError::new(
2361                        format!("internal: FILTER produced unexpected query kind {other:?}"),
2362                        self.position(),
2363                    )),
2364                }
2365            }
2366            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
2367                .parse_sql_command()
2368                .map(SqlCommand::into_statement)
2369                .map(FrontendStatement::Sql),
2370            other => Err(ParseError::expected(
2371                vec![
2372                    "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
2373                    "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
2374                    "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
2375                    "RESET", "DESCRIBE", "DESC", "RANK", "ZRANK", "ZRANGE",
2376                ],
2377                other,
2378                self.position(),
2379            )),
2380        }
2381    }
2382
2383    fn parse_ranking_read(&mut self) -> Result<QueryExpr, ParseError> {
2384        let head = self.expect_ident()?;
2385        if head.eq_ignore_ascii_case("RANK") {
2386            return self.parse_rank_after_rank(false);
2387        }
2388        if head.eq_ignore_ascii_case("APPROX") || head.eq_ignore_ascii_case("APPROXIMATE") {
2389            if !self.consume_ident_ci("RANK")? {
2390                return Err(ParseError::expected(
2391                    vec!["RANK"],
2392                    self.peek(),
2393                    self.position(),
2394                ));
2395            }
2396            return self.parse_rank_after_rank(true);
2397        }
2398        if head.eq_ignore_ascii_case("ZRANK") {
2399            return self.parse_zrank();
2400        }
2401        if head.eq_ignore_ascii_case("ZRANGE") {
2402            return self.parse_zrange();
2403        }
2404        Err(ParseError::expected(
2405            vec!["RANK", "APPROX RANK", "ZRANK", "ZRANGE"],
2406            self.peek(),
2407            self.position(),
2408        ))
2409    }
2410
2411    fn parse_rank_after_rank(&mut self, approximate: bool) -> Result<QueryExpr, ParseError> {
2412        if self.consume(&Token::Of)? {
2413            let entity_id = self.parse_u64_slot("rank entity id")?;
2414            self.expect(Token::In)?;
2415            let ranking = self.expect_ident()?;
2416            let query = RankOfQuery { ranking, entity_id };
2417            return Ok(if approximate {
2418                QueryExpr::ApproxRankOf(query)
2419            } else {
2420                QueryExpr::RankOf(query)
2421            });
2422        }
2423
2424        if !approximate && self.consume(&Token::Range)? {
2425            let lo = self.parse_positive_u64_slot("rank range lower bound")?;
2426            self.expect(Token::To)?;
2427            let hi = self.parse_positive_u64_slot("rank range upper bound")?;
2428            if hi < lo {
2429                return Err(ParseError::value_out_of_range(
2430                    "rank range upper bound",
2431                    "must be greater than or equal to the lower bound",
2432                    self.position(),
2433                ));
2434            }
2435            self.expect(Token::In)?;
2436            let ranking = self.expect_ident()?;
2437            return Ok(QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi }));
2438        }
2439
2440        Err(ParseError::expected(
2441            if approximate {
2442                vec!["OF"]
2443            } else {
2444                vec!["OF", "RANGE"]
2445            },
2446            self.peek(),
2447            self.position(),
2448        ))
2449    }
2450
2451    fn parse_zrank(&mut self) -> Result<QueryExpr, ParseError> {
2452        let ranking = self.expect_ident()?;
2453        let entity_id = self.parse_u64_slot("ZRANK entity id")?;
2454        Ok(QueryExpr::RankOf(RankOfQuery { ranking, entity_id }))
2455    }
2456
2457    fn parse_zrange(&mut self) -> Result<QueryExpr, ParseError> {
2458        let ranking = self.expect_ident()?;
2459        let start = self.parse_u64_slot("ZRANGE start")?;
2460        let stop = self.parse_u64_slot("ZRANGE stop")?;
2461        if stop < start {
2462            return Err(ParseError::value_out_of_range(
2463                "ZRANGE stop",
2464                "must be greater than or equal to start",
2465                self.position(),
2466            ));
2467        }
2468        let _with_scores = self.consume_ident_ci("WITHSCORES")?;
2469        Ok(QueryExpr::RankRange(RankRangeQuery {
2470            ranking,
2471            lo: start + 1,
2472            hi: stop + 1,
2473        }))
2474    }
2475
2476    fn parse_positive_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
2477        let value = self.parse_u64_slot(field)?;
2478        if value == 0 {
2479            return Err(ParseError::value_out_of_range(
2480                field,
2481                "must be a positive integer",
2482                self.position(),
2483            ));
2484        }
2485        Ok(value)
2486    }
2487
2488    fn parse_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
2489        let pos = self.position();
2490        if matches!(self.peek(), Token::Minus | Token::Dash) {
2491            return Err(ParseError::value_out_of_range(
2492                field,
2493                "must be an unsigned integer",
2494                pos,
2495            ));
2496        }
2497        let raw = self.parse_integer()?;
2498        u64::try_from(raw)
2499            .map_err(|_| ParseError::value_out_of_range(field, "must be an unsigned integer", pos))
2500    }
2501
2502    /// Parse any SQL/RQL-style command into the canonical SQL frontend IR.
2503    pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
2504        self.parse_sql_command().map(SqlCommand::into_statement)
2505    }
2506
2507    fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
2508        let mut path = self.expect_ident()?;
2509        while self.consume(&Token::Dot)? {
2510            let next = self.expect_ident_or_keyword()?;
2511            path = format!("{path}.{next}");
2512        }
2513        Ok(if lowercase {
2514            path.to_ascii_lowercase()
2515        } else {
2516            path
2517        })
2518    }
2519
2520    fn normalize_secret_admin_path(path: String) -> String {
2521        if let Some(rest) = path.strip_prefix("red.secrets.") {
2522            format!("red.secret.{rest}")
2523        } else if path == "red.secrets" {
2524            "red.secret".to_string()
2525        } else {
2526            path
2527        }
2528    }
2529
2530    /// Parse any SQL/RQL-style command through a single frontend module.
2531    /// Parse a `CREATE ...` statement. Split out of
2532    /// [`parse_sql_command`] so its very large per-arm locals
2533    /// (every CREATE variant's query struct) live in their own
2534    /// stack frame instead of inflating the dispatcher's frame.
2535    /// `parse_sql_command` recurses (CREATE VIEW ... AS <stmt>,
2536    /// nested subqueries), so a fat dispatcher frame stacked on
2537    /// itself overflowed small (2 MiB) worker-thread stacks (#635).
2538    #[inline(never)]
2539    fn parse_create_command(&mut self) -> Result<SqlCommand, ParseError> {
2540        let pos = self.position();
2541        self.advance()?;
2542
2543        // CREATE [OR REPLACE] [MATERIALIZED] VIEW [IF NOT EXISTS] name AS <select>
2544        // Detect the VIEW path early so OR REPLACE / MATERIALIZED modifiers
2545        // don't collide with other CREATE variants (TABLE, INDEX, etc.).
2546        let mut or_replace = false;
2547        if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
2548            let _ = self.consume_ident_ci("REPLACE")?;
2549            or_replace = true;
2550        }
2551        let materialized = self.consume(&Token::Materialized)?;
2552        if self.check(&Token::View) {
2553            self.advance()?;
2554            let if_not_exists = self.match_if_not_exists()?;
2555            let name = self.expect_ident()?;
2556            // Issue #584 slice 12 — `WITH RETENTION <duration>`
2557            // on CREATE MATERIALIZED VIEW. Parsed before `AS`
2558            // so the SELECT body parser cannot consume the
2559            // trailing `WITH` for its own (TTL / METADATA /
2560            // …) clauses. Persisted on the view definition;
2561            // the physical sweep against view-backing rows
2562            // activates with the slice-9 row-storage follow-up.
2563            let mut retention_duration_ms: Option<u64> = None;
2564            if self.check(&Token::With) {
2565                self.advance()?;
2566                if !self.consume(&Token::Retention)? && !self.consume_ident_ci("RETENTION")? {
2567                    return Err(ParseError::expected(
2568                        vec!["RETENTION"],
2569                        self.peek(),
2570                        self.position(),
2571                    ));
2572                }
2573                if !materialized {
2574                    return Err(ParseError::new(
2575                        "WITH RETENTION is only valid on \
2576                                 CREATE MATERIALIZED VIEW"
2577                            .to_string(),
2578                        self.position(),
2579                    ));
2580                }
2581                let value = self.parse_float()?;
2582                let unit_mult = self.parse_duration_unit()?;
2583                retention_duration_ms = Some((value * unit_mult).round() as u64);
2584            }
2585            // Accept `AS` — the lexer promotes it to `Token::As`
2586            // (keyword) but some paths still see it as an ident.
2587            if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
2588                return Err(ParseError::expected(
2589                    vec!["AS"],
2590                    self.peek(),
2591                    self.position(),
2592                ));
2593            }
2594            // Recursive parse of the body. Any QueryExpr that the
2595            // rest of the grammar accepts is valid (Select, Join, etc.).
2596            let body = self.parse_sql_command()?.into_query_expr();
2597            // Optional `REFRESH EVERY <duration>` clause on
2598            // materialized views (issue #583 slice 10). The
2599            // background scheduler reads this off the view
2600            // descriptor and ticks the view on its cadence.
2601            let mut refresh_every_ms: Option<u64> = None;
2602            if self.check(&Token::Refresh) {
2603                if !materialized {
2604                    return Err(ParseError::new(
2605                        "REFRESH EVERY is only valid on \
2606                                 CREATE MATERIALIZED VIEW"
2607                            .to_string(),
2608                        self.position(),
2609                    ));
2610                }
2611                self.advance()?;
2612                if !self.consume_ident_ci("EVERY")? {
2613                    return Err(ParseError::expected(
2614                        vec!["EVERY"],
2615                        self.peek(),
2616                        self.position(),
2617                    ));
2618                }
2619                let value = self.parse_float()?;
2620                let unit_mult = self.parse_duration_unit()?;
2621                refresh_every_ms = Some((value * unit_mult).round() as u64);
2622            }
2623            return Ok(SqlCommand::CreateView(CreateViewQuery {
2624                name,
2625                query: Box::new(body),
2626                materialized,
2627                if_not_exists,
2628                or_replace,
2629                refresh_every_ms,
2630                retention_duration_ms,
2631            }));
2632        }
2633        // If OR REPLACE / MATERIALIZED was consumed but VIEW was not,
2634        // bail out — no other CREATE form accepts those modifiers.
2635        if or_replace || materialized {
2636            return Err(ParseError::expected(
2637                vec!["VIEW"],
2638                self.peek(),
2639                self.position(),
2640            ));
2641        }
2642
2643        if matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("USER")) {
2644            let stmt = self.parse_create_user_statement()?;
2645            Ok(SqlCommand::CreateUser(stmt))
2646        } else if self.check(&Token::Index) || self.check(&Token::Unique) {
2647            match self.parse_create_index_query()? {
2648                QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
2649                other => Err(ParseError::new(
2650                    format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
2651                    self.position(),
2652                )),
2653            }
2654        } else if self.check(&Token::Table) {
2655            self.expect(Token::Table)?;
2656            match self.parse_create_table_body()? {
2657                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2658                other => Err(ParseError::new(
2659                    format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
2660                    self.position(),
2661                )),
2662            }
2663        } else if self.check(&Token::Graph) {
2664            self.advance()?;
2665            match self.parse_create_collection_model_body(CollectionModel::Graph)? {
2666                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2667                other => Err(ParseError::new(
2668                    format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
2669                    self.position(),
2670                )),
2671            }
2672        } else if self.check(&Token::Document) {
2673            self.advance()?;
2674            match self.parse_create_collection_model_body(CollectionModel::Document)? {
2675                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2676                other => Err(ParseError::new(
2677                    format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
2678                    self.position(),
2679                )),
2680            }
2681        } else if self.check(&Token::Vector) {
2682            self.advance()?;
2683            match self.parse_create_vector_body()? {
2684                QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
2685                other => Err(ParseError::new(
2686                    format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
2687                    self.position(),
2688                )),
2689            }
2690        } else if self.check(&Token::Collection) {
2691            self.advance()?;
2692            match self.parse_create_collection_body()? {
2693                QueryExpr::CreateCollection(query) => Ok(SqlCommand::CreateCollection(query)),
2694                other => Err(ParseError::new(
2695                    format!("internal: CREATE COLLECTION produced unexpected kind {other:?}"),
2696                    self.position(),
2697                )),
2698            }
2699        } else if self.check(&Token::Kv) {
2700            self.advance()?;
2701            match self.parse_create_keyed_body(CollectionModel::Kv)? {
2702                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2703                other => Err(ParseError::new(
2704                    format!("internal: CREATE KV produced unexpected kind {other:?}"),
2705                    self.position(),
2706                )),
2707            }
2708        } else if self.consume_ident_ci("CONFIG")? {
2709            match self.parse_create_keyed_body(CollectionModel::Config)? {
2710                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2711                other => Err(ParseError::new(
2712                    format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
2713                    self.position(),
2714                )),
2715            }
2716        } else if self.consume_ident_ci("VAULT")? {
2717            match self.parse_create_keyed_body(CollectionModel::Vault)? {
2718                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2719                other => Err(ParseError::new(
2720                    format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
2721                    self.position(),
2722                )),
2723            }
2724        } else if self.check(&Token::Timeseries) {
2725            self.advance()?;
2726            match self.parse_create_timeseries_body()? {
2727                QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
2728                other => Err(ParseError::new(
2729                    format!("internal: CREATE TIMESERIES produced unexpected kind {other:?}"),
2730                    self.position(),
2731                )),
2732            }
2733        } else if self.check(&Token::Metric) {
2734            self.advance()?;
2735            match self.parse_create_metric_body()? {
2736                QueryExpr::CreateMetric(query) => Ok(SqlCommand::CreateMetric(query)),
2737                other => Err(ParseError::new(
2738                    format!("internal: CREATE METRIC produced unexpected kind {other:?}"),
2739                    self.position(),
2740                )),
2741            }
2742        } else if self.consume_ident_ci("METRICS")? {
2743            match self.parse_create_metrics_body()? {
2744                QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2745                other => Err(ParseError::new(
2746                    format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
2747                    self.position(),
2748                )),
2749            }
2750        } else if self.consume_ident_ci("SLO")? {
2751            match self.parse_create_slo_body()? {
2752                QueryExpr::CreateSlo(query) => Ok(SqlCommand::CreateSlo(query)),
2753                other => Err(ParseError::new(
2754                    format!("internal: CREATE SLO produced unexpected kind {other:?}"),
2755                    self.position(),
2756                )),
2757            }
2758        } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE")) {
2759            self.advance()?;
2760            match self.parse_create_hypertable_body()? {
2761                QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
2762                other => Err(ParseError::new(
2763                    format!("internal: CREATE HYPERTABLE produced unexpected kind {other:?}"),
2764                    self.position(),
2765                )),
2766            }
2767        } else if self.check(&Token::Queue) {
2768            self.advance()?;
2769            match self.parse_create_queue_body()? {
2770                QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
2771                other => Err(ParseError::new(
2772                    format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
2773                    self.position(),
2774                )),
2775            }
2776        } else if self.check(&Token::Tree) {
2777            self.advance()?;
2778            match self.parse_create_tree_body()? {
2779                QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
2780                other => Err(ParseError::new(
2781                    format!("internal: CREATE TREE produced unexpected kind {other:?}"),
2782                    self.position(),
2783                )),
2784            }
2785        } else if matches!(self.peek(), Token::Ident(n) if
2786                    n.eq_ignore_ascii_case("HLL") ||
2787                    n.eq_ignore_ascii_case("SKETCH") ||
2788                    n.eq_ignore_ascii_case("FILTER"))
2789        {
2790            match self.parse_create_probabilistic()? {
2791                QueryExpr::ProbabilisticCommand(command) => Ok(SqlCommand::Probabilistic(command)),
2792                other => Err(ParseError::new(
2793                    format!("internal: CREATE probabilistic produced unexpected kind {other:?}"),
2794                    self.position(),
2795                )),
2796            }
2797        } else if self.check(&Token::Schema) {
2798            // CREATE SCHEMA [IF NOT EXISTS] name
2799            self.advance()?;
2800            let if_not_exists = self.match_if_not_exists()?;
2801            let name = self.expect_ident()?;
2802            Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
2803                name,
2804                if_not_exists,
2805            }))
2806        } else if self.check(&Token::Policy) {
2807            // Two forms share the leading `CREATE POLICY` tokens:
2808            //   * IAM:   CREATE POLICY '<id>' AS '<json>'          (string literal id)
2809            //   * RLS:   CREATE POLICY <name> ON <target> ...      (bare ident name)
2810            // Disambiguate by peeking the token after POLICY.
2811            self.advance()?;
2812            if matches!(self.peek(), Token::String(_)) {
2813                // IAM form — short-circuit out of the SQL command stack.
2814                let expr = self.parse_create_iam_policy_after_keywords()?;
2815                // Inline command-wrapping: produce a synthetic SqlCommand by
2816                // routing through a generic IAM admin holder. We don't
2817                // have a dedicated SqlCommand variant for IAM yet, so we
2818                // bounce through the existing Grant-shaped Admin slot
2819                // which expects no further tokens.
2820                return Ok(SqlCommand::IamPolicy(expr));
2821            }
2822            let name = self.expect_ident()?;
2823            self.expect(Token::On)?;
2824
2825            let (target_kind, table) = {
2826                use crate::ast::PolicyTargetKind;
2827                let kw = match self.peek() {
2828                    Token::Ident(s) => Some(s.to_ascii_uppercase()),
2829                    _ => None,
2830                };
2831                let kind = kw.as_deref().and_then(|k| match k {
2832                    "NODES" => Some(PolicyTargetKind::Nodes),
2833                    "EDGES" => Some(PolicyTargetKind::Edges),
2834                    "VECTORS" => Some(PolicyTargetKind::Vectors),
2835                    "MESSAGES" => Some(PolicyTargetKind::Messages),
2836                    "POINTS" => Some(PolicyTargetKind::Points),
2837                    "DOCUMENTS" => Some(PolicyTargetKind::Documents),
2838                    _ => None,
2839                });
2840                if let Some(k) = kind {
2841                    self.advance()?;
2842                    self.expect(Token::Of)?;
2843                    let coll = self.expect_ident()?;
2844                    (k, coll)
2845                } else {
2846                    let coll = self.expect_ident()?;
2847                    (PolicyTargetKind::Table, coll)
2848                }
2849            };
2850
2851            let action = if self.consume(&Token::For)? {
2852                let a = match self.peek() {
2853                    Token::Select => {
2854                        self.advance()?;
2855                        Some(PolicyAction::Select)
2856                    }
2857                    Token::Insert => {
2858                        self.advance()?;
2859                        Some(PolicyAction::Insert)
2860                    }
2861                    Token::Update => {
2862                        self.advance()?;
2863                        Some(PolicyAction::Update)
2864                    }
2865                    Token::Delete => {
2866                        self.advance()?;
2867                        Some(PolicyAction::Delete)
2868                    }
2869                    Token::All => {
2870                        self.advance()?;
2871                        None
2872                    }
2873                    _ => None,
2874                };
2875                a
2876            } else {
2877                None
2878            };
2879
2880            let role = if self.consume(&Token::To)? {
2881                Some(self.expect_ident()?)
2882            } else {
2883                None
2884            };
2885
2886            self.expect(Token::Using)?;
2887            self.expect(Token::LParen)?;
2888            let filter = self.parse_filter()?;
2889            self.expect(Token::RParen)?;
2890
2891            Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
2892                name,
2893                table,
2894                action,
2895                role,
2896                using: Box::new(filter),
2897                target_kind,
2898            }))
2899        } else if self.check(&Token::Server) {
2900            // CREATE SERVER [IF NOT EXISTS] name
2901            //   FOREIGN DATA WRAPPER kind
2902            //   [OPTIONS (key 'value', ...)]
2903            self.advance()?;
2904            let if_not_exists = self.match_if_not_exists()?;
2905            let name = self.expect_ident()?;
2906            self.expect(Token::Foreign)?;
2907            self.expect(Token::Data)?;
2908            self.expect(Token::Wrapper)?;
2909            let wrapper = self.expect_ident()?;
2910            let options = self.parse_fdw_options_clause()?;
2911            Ok(SqlCommand::CreateServer(CreateServerQuery {
2912                name,
2913                wrapper,
2914                options,
2915                if_not_exists,
2916            }))
2917        } else if self.check(&Token::Foreign) {
2918            // CREATE FOREIGN TABLE [IF NOT EXISTS] name (cols)
2919            //   SERVER server_name
2920            //   [OPTIONS (key 'value', ...)]
2921            self.advance()?;
2922            self.expect(Token::Table)?;
2923            let if_not_exists = self.match_if_not_exists()?;
2924            let name = self.expect_ident()?;
2925            self.expect(Token::LParen)?;
2926            let mut columns = Vec::new();
2927            loop {
2928                let col_name = self.expect_ident()?;
2929                let data_type = self.expect_ident_or_keyword()?;
2930                // Inline NOT NULL check — the CREATE TABLE path's helper is
2931                // private and coupling to it just for FDW columns isn't worth it.
2932                let mut not_null = false;
2933                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
2934                    self.advance()?;
2935                    if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL")) {
2936                        self.advance()?;
2937                        not_null = true;
2938                    }
2939                }
2940                columns.push(ForeignColumnDef {
2941                    name: col_name,
2942                    data_type,
2943                    not_null,
2944                });
2945                if !self.consume(&Token::Comma)? {
2946                    break;
2947                }
2948            }
2949            self.expect(Token::RParen)?;
2950            self.expect(Token::Server)?;
2951            let server = self.expect_ident()?;
2952            let options = self.parse_fdw_options_clause()?;
2953            Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
2954                name,
2955                server,
2956                columns,
2957                options,
2958                if_not_exists,
2959            }))
2960        } else if self.check(&Token::Sequence) {
2961            // CREATE SEQUENCE [IF NOT EXISTS] name
2962            //   [START [WITH] n] [INCREMENT [BY] n]
2963            self.advance()?;
2964            let if_not_exists = self.match_if_not_exists()?;
2965            let name = self.expect_ident()?;
2966            let mut start: i64 = 1;
2967            let mut increment: i64 = 1;
2968            // Loop over optional clauses in any order.
2969            loop {
2970                if self.consume(&Token::Start)? {
2971                    // Accept `START 100` or `START WITH 100`.
2972                    let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
2973                    start = self.parse_integer()?;
2974                } else if self.consume(&Token::Increment)? {
2975                    // Accept `INCREMENT 5` or `INCREMENT BY 5`.
2976                    let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
2977                    increment = self.parse_integer()?;
2978                } else {
2979                    break;
2980                }
2981            }
2982            Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
2983                name,
2984                if_not_exists,
2985                start,
2986                increment,
2987            }))
2988        } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2989            self.advance()?; // consume MIGRATION
2990            match self.parse_create_migration_body()? {
2991                QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
2992                other => Err(ParseError::new(
2993                    format!("internal: CREATE MIGRATION produced unexpected kind {other:?}"),
2994                    self.position(),
2995                )),
2996            }
2997        } else if let Some(reason) = analytics_v0_non_goal_create(self.peek()) {
2998            // Issue #789 — enforce Analytics v0 non-goals at the parser
2999            // surface. The parent PRD (#782) explicitly excludes generic
3000            // analytics objects, a new event storage model, cohorts,
3001            // funnels, SLA contracts, and adapters from v0. Reject these
3002            // CREATE forms here with a stable, non-goal-specific message
3003            // so accidental use surfaces an obvious "out of scope for v0"
3004            // error rather than the generic CREATE fallback.
3005            Err(ParseError::new(reason, self.position()))
3006        } else if let Some(err) =
3007            ParseError::unsupported_recognized_token(self.peek(), self.position())
3008        {
3009            Err(err)
3010        } else {
3011            Err(ParseError::expected(
3012                vec![
3013                    "TABLE",
3014                    "GRAPH",
3015                    "VECTOR",
3016                    "DOCUMENT",
3017                    "KV",
3018                    "COLLECTION",
3019                    "INDEX",
3020                    "UNIQUE",
3021                    "METRIC",
3022                    "TIMESERIES",
3023                    "QUEUE",
3024                    "TREE",
3025                    "HLL",
3026                    "SKETCH",
3027                    "FILTER",
3028                    "SCHEMA",
3029                    "SEQUENCE",
3030                    "USER",
3031                    "MIGRATION",
3032                ],
3033                self.peek(),
3034                pos,
3035            ))
3036        }
3037    }
3038
3039    pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
3040        match self.peek() {
3041            Token::Select => match self.parse_select_query()? {
3042                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
3043                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
3044                other => Err(ParseError::new(
3045                    format!("internal: SELECT produced unexpected query kind {other:?}"),
3046                    self.position(),
3047                )),
3048            },
3049            Token::From => match self.parse_from_query()? {
3050                QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
3051                QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
3052                other => Err(ParseError::new(
3053                    format!("internal: FROM produced unexpected query kind {other:?}"),
3054                    self.position(),
3055                )),
3056            },
3057            Token::Insert => match self.parse_insert_query()? {
3058                QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
3059                other => Err(ParseError::new(
3060                    format!("internal: INSERT produced unexpected query kind {other:?}"),
3061                    self.position(),
3062                )),
3063            },
3064            Token::Update => match self.parse_update_query()? {
3065                QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
3066                other => Err(ParseError::new(
3067                    format!("internal: UPDATE produced unexpected query kind {other:?}"),
3068                    self.position(),
3069                )),
3070            },
3071            Token::Delete => {
3072                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
3073                {
3074                    self.advance()?; // DELETE
3075                    self.advance()?; // SECRET
3076                    let key =
3077                        Self::normalize_secret_admin_path(self.parse_dotted_admin_path(true)?);
3078                    Ok(SqlCommand::DeleteSecret { key })
3079                } else {
3080                    match self.parse_delete_query()? {
3081                        QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
3082                        other => Err(ParseError::new(
3083                            format!("internal: DELETE produced unexpected query kind {other:?}"),
3084                            self.position(),
3085                        )),
3086                    }
3087                }
3088            }
3089            Token::Truncate => {
3090                self.advance()?;
3091                let model = if self.consume(&Token::Table)? {
3092                    Some(CollectionModel::Table)
3093                } else if self.consume(&Token::Graph)? {
3094                    Some(CollectionModel::Graph)
3095                } else if self.consume(&Token::Vector)? {
3096                    Some(CollectionModel::Vector)
3097                } else if self.consume(&Token::Document)? {
3098                    Some(CollectionModel::Document)
3099                } else if self.consume(&Token::Timeseries)? {
3100                    Some(CollectionModel::TimeSeries)
3101                } else if self.consume_ident_ci("METRICS")? {
3102                    Some(CollectionModel::Metrics)
3103                } else if self.consume(&Token::Kv)? {
3104                    Some(CollectionModel::Kv)
3105                } else if self.consume(&Token::Queue)? {
3106                    Some(CollectionModel::Queue)
3107                } else if self.consume(&Token::Collection)? {
3108                    None
3109                } else {
3110                    return Err(ParseError::expected(
3111                        vec![
3112                            "TABLE",
3113                            "GRAPH",
3114                            "VECTOR",
3115                            "DOCUMENT",
3116                            "TIMESERIES",
3117                            "METRICS",
3118                            "KV",
3119                            "QUEUE",
3120                            "COLLECTION",
3121                        ],
3122                        self.peek(),
3123                        self.position(),
3124                    ));
3125                };
3126                match self.parse_truncate_body(model)? {
3127                    QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
3128                    other => Err(ParseError::new(
3129                        format!("internal: TRUNCATE produced unexpected kind {other:?}"),
3130                        self.position(),
3131                    )),
3132                }
3133            }
3134            Token::Explain => {
3135                // Peek ahead: EXPLAIN MIGRATION name → ExplainMigration
3136                // EXPLAIN ALTER FOR ... → ExplainAlter (existing path)
3137                if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
3138                {
3139                    self.advance()?; // consume EXPLAIN
3140                    match self.parse_explain_migration_after_keyword()? {
3141                        QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
3142                        other => Err(ParseError::new(
3143                            format!(
3144                                "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
3145                            ),
3146                            self.position(),
3147                        )),
3148                    }
3149                } else {
3150                    match self.parse_explain_alter_query()? {
3151                        QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
3152                        other => Err(ParseError::new(
3153                            format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
3154                            self.position(),
3155                        )),
3156                    }
3157                }
3158            }
3159            Token::Create => self.parse_create_command(),
3160            Token::Drop => {
3161                let pos = self.position();
3162                self.advance()?;
3163
3164                // DROP [MATERIALIZED] VIEW [IF EXISTS] name
3165                let materialized = self.consume(&Token::Materialized)?;
3166                if self.check(&Token::View) {
3167                    self.advance()?;
3168                    let if_exists = self.match_if_exists()?;
3169                    let name = self.expect_ident()?;
3170                    return Ok(SqlCommand::DropView(DropViewQuery {
3171                        name,
3172                        materialized,
3173                        if_exists,
3174                    }));
3175                }
3176                if materialized {
3177                    return Err(ParseError::expected(
3178                        vec!["VIEW"],
3179                        self.peek(),
3180                        self.position(),
3181                    ));
3182                }
3183
3184                if self.check(&Token::Index) {
3185                    match self.parse_drop_index_query()? {
3186                        QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
3187                        other => Err(ParseError::new(
3188                            format!("internal: DROP INDEX produced unexpected kind {other:?}"),
3189                            self.position(),
3190                        )),
3191                    }
3192                } else if self.check(&Token::Table) {
3193                    self.expect(Token::Table)?;
3194                    match self.parse_drop_table_body()? {
3195                        QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
3196                        other => Err(ParseError::new(
3197                            format!("internal: DROP TABLE produced unexpected kind {other:?}"),
3198                            self.position(),
3199                        )),
3200                    }
3201                } else if self.check(&Token::Graph) {
3202                    self.advance()?;
3203                    match self.parse_drop_graph_body()? {
3204                        QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
3205                        other => Err(ParseError::new(
3206                            format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
3207                            self.position(),
3208                        )),
3209                    }
3210                } else if self.check(&Token::Vector) {
3211                    self.advance()?;
3212                    match self.parse_drop_vector_body()? {
3213                        QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
3214                        other => Err(ParseError::new(
3215                            format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
3216                            self.position(),
3217                        )),
3218                    }
3219                } else if self.check(&Token::Document) {
3220                    self.advance()?;
3221                    match self.parse_drop_document_body()? {
3222                        QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
3223                        other => Err(ParseError::new(
3224                            format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
3225                            self.position(),
3226                        )),
3227                    }
3228                } else if self.check(&Token::Kv) {
3229                    self.advance()?;
3230                    match self.parse_drop_kv_body()? {
3231                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
3232                        other => Err(ParseError::new(
3233                            format!("internal: DROP KV produced unexpected kind {other:?}"),
3234                            self.position(),
3235                        )),
3236                    }
3237                } else if self.consume_ident_ci("CONFIG")? {
3238                    match self.parse_drop_keyed_body(CollectionModel::Config)? {
3239                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
3240                        other => Err(ParseError::new(
3241                            format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
3242                            self.position(),
3243                        )),
3244                    }
3245                } else if self.consume_ident_ci("VAULT")? {
3246                    match self.parse_drop_keyed_body(CollectionModel::Vault)? {
3247                        QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
3248                        other => Err(ParseError::new(
3249                            format!("internal: DROP VAULT produced unexpected kind {other:?}"),
3250                            self.position(),
3251                        )),
3252                    }
3253                } else if self.check(&Token::Collection) {
3254                    self.advance()?;
3255                    match self.parse_drop_collection_body()? {
3256                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
3257                        other => Err(ParseError::new(
3258                            format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
3259                            self.position(),
3260                        )),
3261                    }
3262                } else if self.check(&Token::Timeseries) {
3263                    self.advance()?;
3264                    match self.parse_drop_timeseries_body()? {
3265                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
3266                        other => Err(ParseError::new(
3267                            format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
3268                            self.position(),
3269                        )),
3270                    }
3271                } else if self.consume_ident_ci("METRICS")? {
3272                    match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
3273                        QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
3274                        other => Err(ParseError::new(
3275                            format!("internal: DROP METRICS produced unexpected kind {other:?}"),
3276                            self.position(),
3277                        )),
3278                    }
3279                } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
3280                {
3281                    // DROP HYPERTABLE name reuses the same AST as
3282                    // DROP TIMESERIES — runtime clears the registry
3283                    // entry *and* drops the backing collection.
3284                    self.advance()?;
3285                    match self.parse_drop_timeseries_body()? {
3286                        QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
3287                        other => Err(ParseError::new(
3288                            format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
3289                            self.position(),
3290                        )),
3291                    }
3292                } else if self.check(&Token::Queue) {
3293                    self.advance()?;
3294                    match self.parse_drop_queue_body()? {
3295                        QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
3296                        other => Err(ParseError::new(
3297                            format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
3298                            self.position(),
3299                        )),
3300                    }
3301                } else if self.check(&Token::Tree) {
3302                    self.advance()?;
3303                    match self.parse_drop_tree_body()? {
3304                        QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
3305                        other => Err(ParseError::new(
3306                            format!("internal: DROP TREE produced unexpected kind {other:?}"),
3307                            self.position(),
3308                        )),
3309                    }
3310                } else if matches!(self.peek(), Token::Ident(n) if
3311                    n.eq_ignore_ascii_case("HLL") ||
3312                    n.eq_ignore_ascii_case("SKETCH") ||
3313                    n.eq_ignore_ascii_case("FILTER"))
3314                {
3315                    match self.parse_drop_probabilistic()? {
3316                        QueryExpr::ProbabilisticCommand(command) => {
3317                            Ok(SqlCommand::Probabilistic(command))
3318                        }
3319                        other => Err(ParseError::new(
3320                            format!(
3321                                "internal: DROP probabilistic produced unexpected kind {other:?}"
3322                            ),
3323                            self.position(),
3324                        )),
3325                    }
3326                } else if self.check(&Token::Schema) {
3327                    // DROP SCHEMA [IF EXISTS] name [CASCADE]
3328                    self.advance()?;
3329                    let if_exists = self.match_if_exists()?;
3330                    let name = self.expect_ident()?;
3331                    let cascade = self.consume(&Token::Cascade)?;
3332                    Ok(SqlCommand::DropSchema(DropSchemaQuery {
3333                        name,
3334                        if_exists,
3335                        cascade,
3336                    }))
3337                } else if self.check(&Token::Policy) {
3338                    // Two forms:
3339                    //   * IAM:   DROP POLICY '<id>'
3340                    //   * RLS:   DROP POLICY [IF EXISTS] name ON table
3341                    self.advance()?;
3342                    if matches!(self.peek(), Token::String(_)) {
3343                        let expr = self.parse_drop_iam_policy_after_keywords()?;
3344                        return Ok(SqlCommand::IamPolicy(expr));
3345                    }
3346                    let if_exists = self.match_if_exists()?;
3347                    let name = self.expect_ident()?;
3348                    self.expect(Token::On)?;
3349                    let table = self.expect_ident()?;
3350                    Ok(SqlCommand::DropPolicy(DropPolicyQuery {
3351                        name,
3352                        table,
3353                        if_exists,
3354                    }))
3355                } else if self.check(&Token::Server) {
3356                    // DROP SERVER [IF EXISTS] name [CASCADE]
3357                    self.advance()?;
3358                    let if_exists = self.match_if_exists()?;
3359                    let name = self.expect_ident()?;
3360                    let cascade = self.consume(&Token::Cascade)?;
3361                    Ok(SqlCommand::DropServer(DropServerQuery {
3362                        name,
3363                        if_exists,
3364                        cascade,
3365                    }))
3366                } else if self.check(&Token::Foreign) {
3367                    // DROP FOREIGN TABLE [IF EXISTS] name
3368                    self.advance()?;
3369                    self.expect(Token::Table)?;
3370                    let if_exists = self.match_if_exists()?;
3371                    let name = self.expect_ident()?;
3372                    Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
3373                        name,
3374                        if_exists,
3375                    }))
3376                } else if self.check(&Token::Sequence) {
3377                    // DROP SEQUENCE [IF EXISTS] name
3378                    self.advance()?;
3379                    let if_exists = self.match_if_exists()?;
3380                    let name = self.expect_ident()?;
3381                    Ok(SqlCommand::DropSequence(DropSequenceQuery {
3382                        name,
3383                        if_exists,
3384                    }))
3385                } else if let Some(err) =
3386                    ParseError::unsupported_recognized_token(self.peek(), self.position())
3387                {
3388                    Err(err)
3389                } else {
3390                    Err(ParseError::expected(
3391                        vec![
3392                            "TABLE",
3393                            "INDEX",
3394                            "TIMESERIES",
3395                            "QUEUE",
3396                            "TREE",
3397                            "HLL",
3398                            "SKETCH",
3399                            "FILTER",
3400                            "SCHEMA",
3401                            "SEQUENCE",
3402                        ],
3403                        self.peek(),
3404                        pos,
3405                    ))
3406                }
3407            }
3408            Token::Alter => {
3409                // Disambiguate ALTER USER / ALTER QUEUE / ALTER TABLE without
3410                // committing to a path until we've seen the target.
3411                // We peek the *next* token (without consuming) and
3412                // dispatch accordingly.
3413                let next = self.peek_next()?.clone();
3414                if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
3415                    self.advance()?; // consume ALTER
3416                    let stmt = self.parse_alter_user_statement()?;
3417                    Ok(SqlCommand::AlterUser(stmt))
3418                } else if matches!(next, Token::Queue) {
3419                    self.advance()?; // consume ALTER
3420                    self.advance()?; // consume QUEUE
3421                    match self.parse_alter_queue_body()? {
3422                        QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
3423                        other => Err(ParseError::new(
3424                            format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
3425                            self.position(),
3426                        )),
3427                    }
3428                } else if matches!(next, Token::Metric) {
3429                    self.advance()?; // consume ALTER
3430                    self.advance()?; // consume METRIC
3431                    match self.parse_alter_metric_body()? {
3432                        QueryExpr::AlterMetric(query) => Ok(SqlCommand::AlterMetric(query)),
3433                        other => Err(ParseError::new(
3434                            format!("internal: ALTER METRIC produced unexpected kind {other:?}"),
3435                            self.position(),
3436                        )),
3437                    }
3438                } else if matches!(next, Token::Graph) {
3439                    // Issue #801 — `ALTER GRAPH name ADD|DROP ANALYTICS ...`
3440                    // shares the AlterTable AST so analytics-config lifecycle
3441                    // mutations dispatch through the existing executor path.
3442                    match self.parse_alter_graph_query()? {
3443                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
3444                        other => Err(ParseError::new(
3445                            format!(
3446                                "internal: ALTER GRAPH produced unexpected query kind {other:?}"
3447                            ),
3448                            self.position(),
3449                        )),
3450                    }
3451                } else if matches!(next, Token::Table)
3452                    || matches!(next, Token::Collection)
3453                    || matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("COLLECTION"))
3454                {
3455                    // Issue #522 — `ALTER COLLECTION` shares the AlterTable
3456                    // AST so signer-registry mutations dispatch through the
3457                    // existing executor. The DDL parser body accepts either
3458                    // keyword interchangeably for the open-vocabulary alters
3459                    // we own (currently `ADD|REVOKE SIGNER`).
3460                    match self.parse_alter_table_query()? {
3461                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
3462                        other => Err(ParseError::new(
3463                            format!(
3464                                "internal: ALTER TABLE produced unexpected query kind {other:?}"
3465                            ),
3466                            self.position(),
3467                        )),
3468                    }
3469                } else if let Some(err) =
3470                    ParseError::unsupported_recognized_token(&next, self.position())
3471                {
3472                    Err(err)
3473                } else {
3474                    match self.parse_alter_table_query()? {
3475                        QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
3476                        other => Err(ParseError::new(
3477                            format!("internal: ALTER produced unexpected query kind {other:?}"),
3478                            self.position(),
3479                        )),
3480                    }
3481                }
3482            }
3483            Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
3484                let stmt = self.parse_grant_statement()?;
3485                Ok(SqlCommand::Grant(stmt))
3486            }
3487            Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
3488                let stmt = self.parse_revoke_statement()?;
3489                Ok(SqlCommand::Revoke(stmt))
3490            }
3491            Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
3492                self.advance()?;
3493                if self.consume_ident_ci("BACKFILL")? {
3494                    return Err(ParseError::new(
3495                        "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
3496                            .to_string(),
3497                        self.position(),
3498                    ));
3499                }
3500                if !self.consume_ident_ci("STATUS")? {
3501                    return Err(ParseError::expected(
3502                        vec!["STATUS"],
3503                        self.peek(),
3504                        self.position(),
3505                    ));
3506                }
3507
3508                let mut query = TableQuery::new("red.subscriptions");
3509                let collection = match self.peek().clone() {
3510                    Token::Ident(name) => {
3511                        self.advance()?;
3512                        Some(name)
3513                    }
3514                    Token::String(name) => {
3515                        self.advance()?;
3516                        Some(name)
3517                    }
3518                    _ => None,
3519                };
3520                self.parse_table_clauses(&mut query)?;
3521                if let Some(collection) = collection {
3522                    let filter = Filter::compare(
3523                        FieldRef::column("red.subscriptions", "collection"),
3524                        CompareOp::Eq,
3525                        Value::text(collection),
3526                    );
3527                    let expr = filter_to_expr(&filter);
3528                    query.where_expr = Some(match query.where_expr.take() {
3529                        Some(existing) => Expr::binop(BinOp::And, existing, expr),
3530                        None => expr,
3531                    });
3532                    query.filter = Some(match query.filter.take() {
3533                        Some(existing) => existing.and(filter),
3534                        None => filter,
3535                    });
3536                }
3537                Ok(SqlCommand::Select(query))
3538            }
3539            Token::Attach => {
3540                let expr = self.parse_attach_policy()?;
3541                Ok(SqlCommand::IamPolicy(expr))
3542            }
3543            Token::Detach => {
3544                let expr = self.parse_detach_policy()?;
3545                Ok(SqlCommand::IamPolicy(expr))
3546            }
3547            Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
3548                let expr = self.parse_simulate_policy()?;
3549                Ok(SqlCommand::IamPolicy(expr))
3550            }
3551            Token::Ident(name) if name.eq_ignore_ascii_case("LINT") => {
3552                let expr = self.parse_lint_policy()?;
3553                Ok(SqlCommand::IamPolicy(expr))
3554            }
3555            Token::Ident(name) if name.eq_ignore_ascii_case("MIGRATE") => {
3556                // `MIGRATE POLICY MODE TO ...` is the S5B (#714) path.
3557                // Lookahead one token because `MIGRATE` is otherwise
3558                // unused at this layer.
3559                let next = self.peek_next()?.clone();
3560                let is_policy_mode = matches!(&next, Token::Policy)
3561                    || matches!(&next, Token::Ident(name)
3562                        if name.eq_ignore_ascii_case("POLICY"));
3563                if is_policy_mode {
3564                    let expr = self.parse_migrate_policy_mode()?;
3565                    return Ok(SqlCommand::IamPolicy(expr));
3566                }
3567                Err(ParseError::expected(
3568                    vec!["POLICY"],
3569                    self.peek(),
3570                    self.position(),
3571                ))
3572            }
3573            Token::Set => {
3574                self.advance()?;
3575                if self.consume_ident_ci("CONFIG")? {
3576                    let full_key = self.parse_dotted_admin_path(true)?;
3577                    self.expect(Token::Eq)?;
3578                    let value = self.parse_literal_value()?;
3579                    Ok(SqlCommand::SetConfig {
3580                        key: full_key,
3581                        value,
3582                    })
3583                } else if self.consume_ident_ci("SECRET")? {
3584                    let key =
3585                        Self::normalize_secret_admin_path(self.parse_dotted_admin_path(true)?);
3586                    self.expect(Token::Eq)?;
3587                    let value = self.parse_literal_value()?;
3588                    Ok(SqlCommand::SetSecret { key, value })
3589                } else if self.consume_ident_ci("TENANT")? {
3590                    // SET TENANT 'id'  |  SET TENANT = 'id'  |
3591                    // SET TENANT NULL  |  SET TENANT = NULL
3592                    let _ = self.consume(&Token::Eq)?;
3593                    if self.consume_ident_ci("NULL")? {
3594                        Ok(SqlCommand::SetTenant(None))
3595                    } else {
3596                        let value = self.parse_literal_value()?;
3597                        match value {
3598                            Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
3599                            Value::Null => Ok(SqlCommand::SetTenant(None)),
3600                            other => Err(ParseError::new(
3601                                format!("SET TENANT expects a text literal or NULL, got {other:?}"),
3602                                self.position(),
3603                            )),
3604                        }
3605                    }
3606                } else {
3607                    Err(ParseError::expected(
3608                        vec!["CONFIG", "SECRET", "TENANT"],
3609                        self.peek(),
3610                        self.position(),
3611                    ))
3612                }
3613            }
3614            Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
3615                self.advance()?;
3616                match self.parse_apply_migration()? {
3617                    QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
3618                    other => Err(ParseError::new(
3619                        format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
3620                        self.position(),
3621                    )),
3622                }
3623            }
3624            Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
3625                // RESET TENANT — session-local clear
3626                self.advance()?;
3627                if self.consume_ident_ci("TENANT")? {
3628                    Ok(SqlCommand::SetTenant(None))
3629                } else {
3630                    Err(ParseError::expected(
3631                        vec!["TENANT"],
3632                        self.peek(),
3633                        self.position(),
3634                    ))
3635                }
3636            }
3637            Token::Ident(name)
3638                if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
3639            {
3640                self.advance()?;
3641                let collection = self.parse_dotted_admin_path(false)?;
3642                let mut query = TableQuery::new("red.describe");
3643                query.filter = Some(Filter::compare(
3644                    FieldRef::column("", "collection"),
3645                    CompareOp::Eq,
3646                    Value::text(collection),
3647                ));
3648                Ok(SqlCommand::Select(query))
3649            }
3650            Token::Desc => {
3651                self.advance()?;
3652                let collection = self.parse_dotted_admin_path(false)?;
3653                let mut query = TableQuery::new("red.describe");
3654                query.filter = Some(Filter::compare(
3655                    FieldRef::column("", "collection"),
3656                    CompareOp::Eq,
3657                    Value::text(collection),
3658                ));
3659                Ok(SqlCommand::Select(query))
3660            }
3661            Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
3662                self.advance()?;
3663                if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
3664                    if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
3665                        return Err(ParseError::expected(
3666                            vec!["TABLE"],
3667                            self.peek(),
3668                            self.position(),
3669                        ));
3670                    }
3671                    let collection = self.parse_dotted_admin_path(false)?;
3672                    let mut query = TableQuery::new("red.show_create");
3673                    query.filter = Some(Filter::compare(
3674                        FieldRef::column("", "collection"),
3675                        CompareOp::Eq,
3676                        Value::text(collection),
3677                    ));
3678                    Ok(SqlCommand::Select(query))
3679                } else if self.consume_ident_ci("CONFIG")? {
3680                    // Accept dotted prefixes the same way SET CONFIG does
3681                    // (`SHOW CONFIG durability.mode`), and empty prefix
3682                    // (`SHOW CONFIG`) for a catalog-wide listing.
3683                    let prefix = if !(self.check(&Token::Eof)
3684                        || self.check(&Token::As)
3685                        || self.check(&Token::Format))
3686                    {
3687                        let first = self.expect_ident()?;
3688                        let mut full = first;
3689                        while self.consume(&Token::Dot)? {
3690                            let next = self.expect_ident_or_keyword()?;
3691                            full = format!("{full}.{next}");
3692                        }
3693                        // Match SET CONFIG: lowercase so keyword segments
3694                        // come out consistent with the stored keys.
3695                        Some(full.to_ascii_lowercase())
3696                    } else {
3697                        None
3698                    };
3699                    let as_json = if self.consume(&Token::As)? || self.consume(&Token::Format)? {
3700                        if !self.consume(&Token::Json)? {
3701                            return Err(ParseError::expected(
3702                                vec!["JSON"],
3703                                self.peek(),
3704                                self.position(),
3705                            ));
3706                        }
3707                        true
3708                    } else {
3709                        false
3710                    };
3711                    Ok(SqlCommand::ShowConfig { prefix, as_json })
3712                } else if self.consume_ident_ci("COLLECTIONS")? {
3713                    let mut query = TableQuery::new("red.collections");
3714                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
3715                        if !self.consume_ident_ci("INTERNAL")? {
3716                            return Err(ParseError::expected(
3717                                vec!["INTERNAL"],
3718                                self.peek(),
3719                                self.position(),
3720                            ));
3721                        }
3722                        true
3723                    } else {
3724                        false
3725                    };
3726                    self.parse_table_clauses(&mut query)?;
3727                    if !include_internal {
3728                        let user_filter = query.filter.take();
3729                        let hide_internal = crate::ast::Filter::Compare {
3730                            field: FieldRef::column("", "internal"),
3731                            op: CompareOp::Eq,
3732                            value: Value::Boolean(false),
3733                        };
3734                        query.filter = Some(match user_filter {
3735                            Some(filter) => filter.and(hide_internal),
3736                            None => hide_internal,
3737                        });
3738                    }
3739                    Ok(SqlCommand::Select(query))
3740                } else if self.consume_ident_ci("TABLES")? {
3741                    Ok(SqlCommand::Select(parse_show_collections_by_model(
3742                        self, "table",
3743                    )?))
3744                } else if self.consume_ident_ci("QUEUES")? {
3745                    // Issue #535 — `SHOW QUEUES` desugars to the
3746                    // `red.queues` virtual table (queue-shaped
3747                    // columns), not the filtered `red.collections`
3748                    // view. `INCLUDING INTERNAL` mirrors the
3749                    // `SHOW COLLECTIONS` opt-in: without it, DLQ
3750                    // targets and other auto-created queues are
3751                    // hidden via the `internal = false` filter.
3752                    let mut query = TableQuery::new("red.queues");
3753                    let include_internal = if self.consume_ident_ci("INCLUDING")? {
3754                        if !self.consume_ident_ci("INTERNAL")? {
3755                            return Err(ParseError::expected(
3756                                vec!["INTERNAL"],
3757                                self.peek(),
3758                                self.position(),
3759                            ));
3760                        }
3761                        true
3762                    } else {
3763                        false
3764                    };
3765                    self.parse_table_clauses(&mut query)?;
3766                    if !include_internal {
3767                        let hide_internal = Filter::Compare {
3768                            field: FieldRef::column("", "internal"),
3769                            op: CompareOp::Eq,
3770                            value: Value::Boolean(false),
3771                        };
3772                        add_table_filter(&mut query, hide_internal);
3773                    }
3774                    Ok(SqlCommand::Select(query))
3775                } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
3776                    Ok(SqlCommand::Select(parse_show_collections_by_model(
3777                        self, "vector",
3778                    )?))
3779                } else if self.consume_ident_ci("DOCUMENTS")? {
3780                    Ok(SqlCommand::Select(parse_show_collections_by_model(
3781                        self, "document",
3782                    )?))
3783                } else if self.consume(&Token::Timeseries)?
3784                    || self.consume_ident_ci("TIMESERIES")?
3785                {
3786                    Ok(SqlCommand::Select(parse_show_collections_by_model(
3787                        self,
3788                        "timeseries",
3789                    )?))
3790                } else if self.consume_ident_ci("GRAPHS")? {
3791                    Ok(SqlCommand::Select(parse_show_collections_by_model(
3792                        self, "graph",
3793                    )?))
3794                } else if self.consume_ident_ci("CONFIGS")? {
3795                    Ok(SqlCommand::Select(parse_show_collections_by_model(
3796                        self, "config",
3797                    )?))
3798                } else if self.consume_ident_ci("VAULTS")? {
3799                    Ok(SqlCommand::Select(parse_show_collections_by_model(
3800                        self, "vault",
3801                    )?))
3802                } else if self.consume(&Token::Kv)?
3803                    || self.consume_ident_ci("KV")?
3804                    || self.consume_ident_ci("KVS")?
3805                {
3806                    Ok(SqlCommand::Select(parse_show_collections_by_model(
3807                        self, "kv",
3808                    )?))
3809                } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
3810                    let collection = self.parse_dotted_admin_path(false)?;
3811                    let mut query = TableQuery::new("red.columns");
3812                    query.filter = Some(Filter::compare(
3813                        FieldRef::column("", "collection"),
3814                        CompareOp::Eq,
3815                        Value::text(collection),
3816                    ));
3817                    Ok(SqlCommand::Select(query))
3818                } else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
3819                    let mut query = TableQuery::new("red.show_indexes");
3820                    if self.consume(&Token::On)? {
3821                        let collection = self.expect_ident_or_keyword()?;
3822                        let filter = Filter::Compare {
3823                            field: FieldRef::column("", "table"),
3824                            op: CompareOp::Eq,
3825                            value: Value::text(collection),
3826                        };
3827                        query.where_expr = Some(filter_to_expr(&filter));
3828                        query.filter = Some(filter);
3829                    }
3830                    self.parse_table_clauses(&mut query)?;
3831                    Ok(SqlCommand::Select(query))
3832                } else if self.consume_ident_ci("POLICIES")? {
3833                    if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
3834                        let principal = self.parse_iam_principal_kind()?;
3835                        return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
3836                            filter: Some(principal),
3837                        }));
3838                    }
3839                    let mut query = TableQuery::new("red.policies");
3840                    let collection_filter =
3841                        if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
3842                            let collection = self.parse_dotted_admin_path(false)?;
3843                            Some(Filter::Compare {
3844                                field: FieldRef::TableColumn {
3845                                    table: String::new(),
3846                                    column: "collection".to_string(),
3847                                },
3848                                op: CompareOp::Eq,
3849                                value: Value::text(collection),
3850                            })
3851                        } else {
3852                            None
3853                        };
3854                    self.parse_table_clauses(&mut query)?;
3855                    if let Some(collection_filter) = collection_filter {
3856                        let combined = match query.filter.take() {
3857                            Some(existing) => {
3858                                Filter::And(Box::new(collection_filter), Box::new(existing))
3859                            }
3860                            None => collection_filter,
3861                        };
3862                        query.where_expr = Some(filter_to_expr(&combined));
3863                        query.filter = Some(combined);
3864                    }
3865                    Ok(SqlCommand::Select(query))
3866                } else if self.consume_ident_ci("STATS")? {
3867                    let mut query = TableQuery::new("red.stats");
3868                    let collection = match self.peek().clone() {
3869                        Token::Ident(name) => {
3870                            self.advance()?;
3871                            Some(name)
3872                        }
3873                        Token::String(name) => {
3874                            self.advance()?;
3875                            Some(name)
3876                        }
3877                        _ => None,
3878                    };
3879                    self.parse_table_clauses(&mut query)?;
3880                    if let Some(collection) = collection {
3881                        let filter = Filter::compare(
3882                            FieldRef::column("red.stats", "collection"),
3883                            CompareOp::Eq,
3884                            Value::text(collection),
3885                        );
3886                        let expr = filter_to_expr(&filter);
3887                        query.where_expr = Some(match query.where_expr.take() {
3888                            Some(existing) => Expr::binop(BinOp::And, existing, expr),
3889                            None => expr,
3890                        });
3891                        query.filter = Some(match query.filter.take() {
3892                            Some(existing) => existing.and(filter),
3893                            None => filter,
3894                        });
3895                    }
3896                    Ok(SqlCommand::Select(query))
3897                } else if self.consume_ident_ci("SAMPLE")? {
3898                    let mut query = TableQuery::new(&self.expect_ident()?);
3899                    query.limit = if self.consume(&Token::Limit)? {
3900                        Some(self.parse_integer()? as u64)
3901                    } else {
3902                        Some(10)
3903                    };
3904                    Ok(SqlCommand::Select(query))
3905                } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
3906                    let prefix = if !self.check(&Token::Eof) {
3907                        Some(Self::normalize_secret_admin_path(
3908                            self.parse_dotted_admin_path(true)?,
3909                        ))
3910                    } else {
3911                        None
3912                    };
3913                    Ok(SqlCommand::ShowSecrets { prefix })
3914                } else if self.consume_ident_ci("TENANT")? {
3915                    Ok(SqlCommand::ShowTenant)
3916                } else if let Some(expr) = self.parse_show_iam_after_show()? {
3917                    Ok(SqlCommand::IamPolicy(expr))
3918                } else {
3919                    Err(ParseError::expected(
3920                        vec![
3921                            "CONFIG",
3922                            "SECRET",
3923                            "SECRETS",
3924                            "COLLECTIONS",
3925                            "TABLES",
3926                            "QUEUES",
3927                            "VECTORS",
3928                            "DOCUMENTS",
3929                            "TIMESERIES",
3930                            "GRAPHS",
3931                            "KV",
3932                            "SCHEMA",
3933                            "INDICES",
3934                            "INDEXES",
3935                            "SAMPLE",
3936                            "POLICIES",
3937                            "STATS",
3938                            "TENANT",
3939                            "EFFECTIVE",
3940                        ],
3941                        self.peek(),
3942                        self.position(),
3943                    ))
3944                }
3945            }
3946            // Transaction control statements (Phase 1.1 PG parity).
3947            // BEGIN [WORK | TRANSACTION] [ISOLATION LEVEL <mode>]
3948            // START TRANSACTION [ISOLATION LEVEL <mode>]
3949            //
3950            // We only implement SNAPSHOT ISOLATION (our default). We
3951            // accept READ UNCOMMITTED / READ COMMITTED / REPEATABLE
3952            // READ / SNAPSHOT as PG-compatible no-ops, but reject
3953            // SERIALIZABLE outright — the previous behaviour of
3954            // silently degrading to snapshot made the parser
3955            // dishonest. Real SSI (Serializable Snapshot Isolation)
3956            // is tracked as a future milestone.
3957            Token::Begin | Token::Start => {
3958                self.advance()?;
3959                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
3960                // Optional ISOLATION LEVEL clause.
3961                if self.consume_ident_ci("ISOLATION")? {
3962                    self.expect(Token::Level)?;
3963                    // The level identifier can span multiple words
3964                    // (READ UNCOMMITTED / READ COMMITTED / REPEATABLE
3965                    // READ). Collect them case-insensitively.
3966                    let mut parts: Vec<String> = Vec::new();
3967                    if self.consume_ident_ci("READ")? {
3968                        parts.push("READ".to_string());
3969                        if self.consume_ident_ci("UNCOMMITTED")? {
3970                            parts.push("UNCOMMITTED".to_string());
3971                        } else if self.consume_ident_ci("COMMITTED")? {
3972                            parts.push("COMMITTED".to_string());
3973                        } else {
3974                            return Err(ParseError::expected(
3975                                vec!["UNCOMMITTED", "COMMITTED"],
3976                                self.peek(),
3977                                self.position(),
3978                            ));
3979                        }
3980                    } else if self.consume_ident_ci("REPEATABLE")? {
3981                        parts.push("REPEATABLE".to_string());
3982                        if !self.consume_ident_ci("READ")? {
3983                            return Err(ParseError::expected(
3984                                vec!["READ"],
3985                                self.peek(),
3986                                self.position(),
3987                            ));
3988                        }
3989                        parts.push("READ".to_string());
3990                    } else if self.consume_ident_ci("SNAPSHOT")? {
3991                        parts.push("SNAPSHOT".to_string());
3992                    } else if self.consume_ident_ci("SERIALIZABLE")? {
3993                        return Err(ParseError::new(
3994                            "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
3995                             currently provides SNAPSHOT ISOLATION (which PG calls \
3996                             REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
3997                             READ COMMITTED, or omit ISOLATION LEVEL for the default."
3998                                .to_string(),
3999                            self.position(),
4000                        ));
4001                    } else {
4002                        return Err(ParseError::expected(
4003                            vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
4004                            self.peek(),
4005                            self.position(),
4006                        ));
4007                    }
4008                    // All accepted modes map to our snapshot engine today.
4009                    let _ = parts;
4010                }
4011                Ok(SqlCommand::TransactionControl(TxnControl::Begin))
4012            }
4013            // COMMIT [WORK | TRANSACTION]
4014            Token::Commit => {
4015                self.advance()?;
4016                let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
4017                Ok(SqlCommand::TransactionControl(TxnControl::Commit))
4018            }
4019            // ROLLBACK [WORK | TRANSACTION] [TO [SAVEPOINT] name]
4020            // ROLLBACK MIGRATION name
4021            Token::Rollback => {
4022                self.advance()?;
4023                if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
4024                    match self.parse_rollback_migration_after_keyword()? {
4025                        QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
4026                        other => Err(ParseError::new(
4027                            format!(
4028                                "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
4029                            ),
4030                            self.position(),
4031                        )),
4032                    }
4033                } else {
4034                    let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
4035                    if self.consume(&Token::To)? {
4036                        let _ = self.consume(&Token::Savepoint)?;
4037                        let name = self.expect_ident()?;
4038                        Ok(SqlCommand::TransactionControl(
4039                            TxnControl::RollbackToSavepoint(name),
4040                        ))
4041                    } else {
4042                        Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
4043                    }
4044                }
4045            }
4046            // SAVEPOINT name
4047            Token::Savepoint => {
4048                self.advance()?;
4049                let name = self.expect_ident()?;
4050                Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
4051            }
4052            // RELEASE [SAVEPOINT] name
4053            Token::Release => {
4054                self.advance()?;
4055                let _ = self.consume(&Token::Savepoint)?;
4056                let name = self.expect_ident()?;
4057                Ok(SqlCommand::TransactionControl(
4058                    TxnControl::ReleaseSavepoint(name),
4059                ))
4060            }
4061            // VACUUM [FULL] [table]
4062            Token::Vacuum => {
4063                self.advance()?;
4064                let full = self.consume(&Token::Full)?;
4065                let target = if self.check(&Token::Eof) {
4066                    None
4067                } else {
4068                    Some(self.expect_ident()?)
4069                };
4070                Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
4071                    target,
4072                    full,
4073                }))
4074            }
4075            // REFRESH MATERIALIZED VIEW name
4076            Token::Refresh => {
4077                self.advance()?;
4078                self.expect(Token::Materialized)?;
4079                self.expect(Token::View)?;
4080                let name = self.expect_ident()?;
4081                Ok(SqlCommand::RefreshMaterializedView(
4082                    RefreshMaterializedViewQuery { name },
4083                ))
4084            }
4085            // ANALYZE [table]
4086            Token::Analyze => {
4087                self.advance()?;
4088                let target = if self.check(&Token::Eof) {
4089                    None
4090                } else {
4091                    Some(self.expect_ident()?)
4092                };
4093                Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
4094                    target,
4095                }))
4096            }
4097            // COPY table FROM 'path' [WITH (...)] [DELIMITER 'x'] [HEADER [true|false]]
4098            //
4099            // Accepts both PG-style `WITH (FORMAT csv, HEADER true)` and the
4100            // short-form `DELIMITER ',' HEADER`. The only supported format
4101            // today is CSV.
4102            Token::Copy => {
4103                self.advance()?;
4104                let table = self.expect_ident()?;
4105                self.expect(Token::From)?;
4106                let path = self.parse_string()?;
4107
4108                let mut delimiter: Option<char> = None;
4109                let mut has_header = false;
4110                let format = CopyFormat::Csv;
4111
4112                // Optional `WITH (FORMAT csv, HEADER true, DELIMITER ',')` block.
4113                // `WITH` is a reserved keyword token — accept both the keyword
4114                // form and the ident form that non-CTE callers sometimes emit.
4115                if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
4116                    self.expect(Token::LParen)?;
4117                    loop {
4118                        if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
4119                            let _ = self.consume(&Token::Eq)?;
4120                            // Only CSV for now — accept the ident and move on.
4121                            let _ = self.expect_ident()?;
4122                        } else if self.consume(&Token::Header)? {
4123                            let _ = self.consume(&Token::Eq)?;
4124                            // Accept `HEADER`, `HEADER = true`, `HEADER = false`,
4125                            // or an ident spelling of true/false.
4126                            has_header = match self.peek().clone() {
4127                                Token::True => {
4128                                    self.advance()?;
4129                                    true
4130                                }
4131                                Token::False => {
4132                                    self.advance()?;
4133                                    false
4134                                }
4135                                Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
4136                                    self.advance()?;
4137                                    true
4138                                }
4139                                Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
4140                                    self.advance()?;
4141                                    false
4142                                }
4143                                _ => true,
4144                            };
4145                        } else if self.consume(&Token::Delimiter)? {
4146                            let _ = self.consume(&Token::Eq)?;
4147                            let s = self.parse_string()?;
4148                            delimiter = s.chars().next();
4149                        } else {
4150                            break;
4151                        }
4152                        if !self.consume(&Token::Comma)? {
4153                            break;
4154                        }
4155                    }
4156                    self.expect(Token::RParen)?;
4157                }
4158
4159                // Short form clauses outside WITH (in either order).
4160                loop {
4161                    if self.consume(&Token::Delimiter)? {
4162                        let s = self.parse_string()?;
4163                        delimiter = s.chars().next();
4164                    } else if self.consume(&Token::Header)? {
4165                        has_header = true;
4166                    } else {
4167                        break;
4168                    }
4169                }
4170
4171                Ok(SqlCommand::CopyFrom(CopyFromQuery {
4172                    table,
4173                    path,
4174                    format,
4175                    delimiter,
4176                    has_header,
4177                }))
4178            }
4179            other => Err(ParseError::expected(
4180                vec![
4181                    "SELECT",
4182                    "FROM",
4183                    "INSERT",
4184                    "UPDATE",
4185                    "DELETE",
4186                    "EXPLAIN",
4187                    "CREATE",
4188                    "DROP",
4189                    "ALTER",
4190                    "SET",
4191                    "SHOW",
4192                    "BEGIN",
4193                    "COMMIT",
4194                    "ROLLBACK",
4195                    "SAVEPOINT",
4196                    "RELEASE",
4197                    "START",
4198                    "VACUUM",
4199                    "ANALYZE",
4200                    "COPY",
4201                    "REFRESH",
4202                    "DESCRIBE",
4203                    "DESC",
4204                ],
4205                other,
4206                self.position(),
4207            )),
4208        }
4209    }
4210}