1use crate::ast::{
2 AlterMetricQuery, AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery,
3 AskQuery, BinOp, CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
4 CreateForeignTableQuery, CreateIndexQuery, CreateMetricQuery, CreateMigrationQuery,
5 CreatePolicyQuery, CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery,
6 CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery, CreateUserStmt,
7 CreateVectorQuery, CreateViewQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery,
8 DropForeignTableQuery, DropGraphQuery, DropIndexQuery, DropKvQuery, DropPolicyQuery,
9 DropQueueQuery, DropSchemaQuery, DropSequenceQuery, DropServerQuery, DropTableQuery,
10 DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery, DropViewQuery, EventsBackfillQuery,
11 ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef, Filter, ForeignColumnDef, GrantStmt,
12 GraphCommand, GraphQuery, HybridQuery, InsertQuery, JoinQuery, KvCommand, MaintenanceCommand,
13 PathQuery, PolicyAction, ProbabilisticCommand, QueryExpr, QueueCommand, QueueSelectQuery,
14 RankOfQuery, RankRangeQuery, RefreshMaterializedViewQuery, RevokeStmt, RollbackMigrationQuery,
15 SearchCommand, Span, TableQuery, TreeCommand, TruncateQuery, TxnControl, UpdateQuery,
16 VectorQuery,
17};
18use crate::lexer::Token;
19use crate::parser::{ParseError, Parser, SafeTokenDisplay};
20use crate::sql_lowering::filter_to_expr;
21use reddb_types::catalog::CollectionModel;
22use reddb_types::types::Value;
23
24#[derive(Debug, Clone)]
29pub enum SqlStatement {
30 Query(SqlQuery),
31 Mutation(SqlMutation),
32 Schema(SqlSchemaCommand),
33 Admin(SqlAdminCommand),
34}
35
36#[derive(Debug, Clone)]
37#[allow(clippy::large_enum_variant)]
38pub enum FrontendStatement {
39 Sql(SqlStatement),
40 Graph(GraphQuery),
41 GraphCommand(GraphCommand),
42 Path(PathQuery),
43 Vector(VectorQuery),
44 Hybrid(HybridQuery),
45 Search(SearchCommand),
46 Ask(AskQuery),
47 QueueSelect(QueueSelectQuery),
48 QueueCommand(QueueCommand),
49 EventsBackfill(EventsBackfillQuery),
50 EventsBackfillStatus { collection: String },
51 TreeCommand(TreeCommand),
52 ProbabilisticCommand(ProbabilisticCommand),
53 KvCommand(KvCommand),
54 ConfigCommand(ConfigCommand),
55 Ranking(QueryExpr),
56}
57
58#[derive(Debug, Clone)]
59pub enum SqlCommand {
60 Select(TableQuery),
61 Join(JoinQuery),
62 Insert(InsertQuery),
63 Update(UpdateQuery),
64 Delete(DeleteQuery),
65 ExplainAlter(ExplainAlterQuery),
66 CreateTable(CreateTableQuery),
67 CreateCollection(CreateCollectionQuery),
68 CreateVector(CreateVectorQuery),
69 DropTable(DropTableQuery),
70 DropGraph(DropGraphQuery),
71 DropVector(DropVectorQuery),
72 DropDocument(DropDocumentQuery),
73 DropKv(DropKvQuery),
74 DropCollection(DropCollectionQuery),
75 Truncate(TruncateQuery),
76 AlterTable(AlterTableQuery),
77 CreateIndex(CreateIndexQuery),
78 DropIndex(DropIndexQuery),
79 CreateTimeSeries(CreateTimeSeriesQuery),
80 CreateMetric(CreateMetricQuery),
81 AlterMetric(AlterMetricQuery),
82 CreateSlo(CreateSloQuery),
83 DropTimeSeries(DropTimeSeriesQuery),
84 CreateQueue(CreateQueueQuery),
85 AlterQueue(AlterQueueQuery),
86 DropQueue(DropQueueQuery),
87 CreateTree(CreateTreeQuery),
88 DropTree(DropTreeQuery),
89 Probabilistic(ProbabilisticCommand),
90 SetConfig {
91 key: String,
92 value: Value,
93 },
94 ShowConfig {
95 prefix: Option<String>,
96 as_json: bool,
97 },
98 SetSecret {
99 key: String,
100 value: Value,
101 },
102 DeleteSecret {
103 key: String,
104 },
105 ShowSecrets {
106 prefix: Option<String>,
107 },
108 SetTenant(Option<String>),
109 ShowTenant,
110 TransactionControl(TxnControl),
111 Maintenance(MaintenanceCommand),
112 CreateSchema(CreateSchemaQuery),
113 DropSchema(DropSchemaQuery),
114 CreateSequence(CreateSequenceQuery),
115 DropSequence(DropSequenceQuery),
116 CopyFrom(CopyFromQuery),
117 CreateView(CreateViewQuery),
118 DropView(DropViewQuery),
119 RefreshMaterializedView(RefreshMaterializedViewQuery),
120 CreatePolicy(CreatePolicyQuery),
121 DropPolicy(DropPolicyQuery),
122 CreateServer(CreateServerQuery),
123 DropServer(DropServerQuery),
124 CreateForeignTable(CreateForeignTableQuery),
125 DropForeignTable(DropForeignTableQuery),
126 Grant(GrantStmt),
128 Revoke(RevokeStmt),
130 AlterUser(AlterUserStmt),
132 CreateUser(CreateUserStmt),
134 IamPolicy(QueryExpr),
139 CreateMigration(CreateMigrationQuery),
140 ApplyMigration(ApplyMigrationQuery),
141 RollbackMigration(RollbackMigrationQuery),
142 ExplainMigration(ExplainMigrationQuery),
143}
144
145fn analytics_v0_non_goal_create(token: &Token) -> Option<String> {
154 let ident = match token {
155 Token::Ident(s) => s,
156 _ => return None,
157 };
158 let upper = ident.to_ascii_uppercase();
159 let message = match upper.as_str() {
160 "ANALYTICS" => {
161 "CREATE ANALYTICS is not supported in Analytics v0 — \
162 use CREATE METRIC <dotted.path> for the metric-centric \
163 catalog (PRD #782 non-goal)"
164 }
165 "EVENT" => {
166 "CREATE EVENT is not supported in Analytics v0 — \
167 event-shaped data lives in ordinary TABLE/DOCUMENT \
168 collections, not a new storage model (PRD #782 non-goal)"
169 }
170 "COHORT" => {
171 "CREATE COHORT is not supported in Analytics v0 — \
172 cohort surfaces are deferred (PRD #782 non-goal)"
173 }
174 "FUNNEL" => {
175 "CREATE FUNNEL is not supported in Analytics v0 — \
176 funnel surfaces are deferred (PRD #782 non-goal)"
177 }
178 "SLA" => {
179 "CREATE SLA is not supported in Analytics v0 — \
180 SLA/legal/commercial contract modeling is post-MVP \
181 (PRD #782 non-goal)"
182 }
183 "ADAPTER" => {
184 "CREATE ADAPTER is not supported in Analytics v0 — \
185 Prometheus/Grafana/Snowplow/Google Analytics adapters \
186 are deferred (PRD #782 non-goal)"
187 }
188 _ => return None,
189 };
190 Some(message.to_string())
191}
192
193fn collection_model_filter(model: &str) -> Filter {
194 Filter::Compare {
195 field: FieldRef::column("", "model"),
196 op: CompareOp::Eq,
197 value: Value::Text(model.to_string().into()),
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use super::*;
204 use reddb_types::catalog::CollectionModel;
205
206 fn frontend(input: &str) -> FrontendStatement {
207 parse_frontend(input)
208 .unwrap_or_else(|err| panic!("failed to parse frontend {input:?}: {err:?}"))
209 }
210
211 fn expr(input: &str) -> QueryExpr {
212 frontend(input).into_query_expr()
213 }
214
215 fn sql_command(input: &str) -> SqlCommand {
216 sql_command_result(input)
217 .unwrap_or_else(|err| panic!("failed to parse SQL command {input:?}: {err:?}"))
218 }
219
220 fn sql_command_result(input: &str) -> Result<SqlCommand, ParseError> {
221 let mut parser = Parser::new(input)?;
222 parser.parse_sql_command()
223 }
224
225 fn assert_text(value: &Value, expected: &str) {
226 match value {
227 Value::Text(text) => assert_eq!(text.as_ref(), expected),
228 other => panic!("expected text {expected:?}, got {other:?}"),
229 }
230 }
231
232 #[test]
233 fn parse_frontend_routes_core_sql_statements() {
234 let FrontendStatement::Sql(SqlStatement::Query(SqlQuery::Select(query))) =
235 frontend("SELECT * FROM users")
236 else {
237 panic!("SELECT should route to SqlStatement::Query::Select");
238 };
239 assert_eq!(query.table, "users");
240
241 let QueryExpr::Insert(query) = expr("INSERT INTO users (id, name) VALUES (1, 'ada')")
242 else {
243 panic!("INSERT should lower through the SQL frontend");
244 };
245 assert_eq!(query.table, "users");
246 assert_eq!(query.columns, vec!["id", "name"]);
247 assert_eq!(query.values.len(), 1);
248
249 let QueryExpr::Update(query) = expr("UPDATE users SET name = 'ada' WHERE id = 1") else {
250 panic!("UPDATE should lower through the SQL frontend");
251 };
252 assert_eq!(query.table, "users");
253 assert_eq!(query.assignments[0].0, "name");
254
255 let QueryExpr::Delete(query) = expr("DELETE FROM users WHERE id = 1") else {
256 panic!("DELETE should lower through the SQL frontend");
257 };
258 assert_eq!(query.table, "users");
259 assert!(query.filter.is_some());
260
261 let QueryExpr::CreateTable(query) = expr("CREATE TABLE users (id INT, name TEXT)") else {
262 panic!("CREATE TABLE should lower through the SQL frontend");
263 };
264 assert_eq!(query.collection_model, CollectionModel::Table);
265 assert_eq!(query.name, "users");
266 assert_eq!(query.columns[0].name, "id");
267
268 let QueryExpr::DropTable(query) = expr("DROP TABLE IF EXISTS users") else {
269 panic!("DROP TABLE should lower through the SQL frontend");
270 };
271 assert_eq!(query.name, "users");
272 assert!(query.if_exists);
273 }
274
275 #[test]
276 fn parse_frontend_routes_admin_and_catalog_sql() {
277 let QueryExpr::Table(query) = expr("SHOW COLLECTIONS") else {
278 panic!("SHOW COLLECTIONS should become a red.collections table query");
279 };
280 assert_eq!(query.table, "red.collections");
281 assert!(query.filter.is_some());
282
283 let QueryExpr::Table(query) = expr("SHOW TABLES LIMIT 5") else {
284 panic!("SHOW TABLES should become a filtered red.collections table query");
285 };
286 assert_eq!(query.table, "red.collections");
287 assert_eq!(query.limit, Some(5));
288 assert!(query.filter.is_some());
289
290 assert!(matches!(
291 expr("SHOW CONFIG durability.mode"),
292 QueryExpr::ShowConfig { prefix: Some(prefix), as_json: false } if prefix == "durability.mode"
293 ));
294 assert!(matches!(
295 expr("SHOW CONFIG"),
296 QueryExpr::ShowConfig {
297 prefix: None,
298 as_json: false
299 }
300 ));
301 assert!(matches!(
302 expr("SHOW CONFIG runtime.result_cache AS JSON"),
303 QueryExpr::ShowConfig { prefix: Some(prefix), as_json: true } if prefix == "runtime.result_cache"
304 ));
305 assert!(matches!(
306 expr("SHOW CONFIG FORMAT JSON"),
307 QueryExpr::ShowConfig {
308 prefix: None,
309 as_json: true
310 }
311 ));
312
313 let QueryExpr::SetConfig { key, value } = expr("SET CONFIG durability.mode = 'sync'")
314 else {
315 panic!("SET CONFIG should stay on the SQL admin surface");
316 };
317 assert_eq!(key, "durability.mode");
318 assert_text(&value, "sync");
319
320 let QueryExpr::SetSecret { key, value } = expr("SET SECRET provider.api_key = 'sk_test'")
321 else {
322 panic!("SET SECRET should stay on the SQL admin surface");
323 };
324 assert_eq!(key, "provider.api_key");
325 assert_text(&value, "sk_test");
326 assert!(matches!(
327 expr("SET SECRET red.secrets.provider.api_key = 'sk_test'"),
328 QueryExpr::SetSecret { key, .. } if key == "red.secret.provider.api_key"
329 ));
330
331 assert!(matches!(
332 expr("DELETE SECRET provider.api_key"),
333 QueryExpr::DeleteSecret { key } if key == "provider.api_key"
334 ));
335 assert!(matches!(
336 expr("DELETE SECRET red.secrets.provider.api_key"),
337 QueryExpr::DeleteSecret { key } if key == "red.secret.provider.api_key"
338 ));
339 assert!(matches!(
340 expr("SHOW SECRETS provider"),
341 QueryExpr::ShowSecrets { prefix: Some(prefix) } if prefix == "provider"
342 ));
343 assert!(matches!(
344 expr("SHOW SECRETS red.secrets.provider"),
345 QueryExpr::ShowSecrets { prefix: Some(prefix) } if prefix == "red.secret.provider"
346 ));
347 assert!(matches!(
348 expr("SET TENANT 'acme'"),
349 QueryExpr::SetTenant(Some(tenant)) if tenant == "acme"
350 ));
351 assert!(matches!(expr("RESET TENANT"), QueryExpr::SetTenant(None)));
352 assert!(matches!(expr("SHOW TENANT"), QueryExpr::ShowTenant));
353 assert!(matches!(
354 expr("BEGIN ISOLATION LEVEL SNAPSHOT"),
355 QueryExpr::TransactionControl(TxnControl::Begin)
356 ));
357 assert!(matches!(
358 expr("ROLLBACK TO SAVEPOINT sp1"),
359 QueryExpr::TransactionControl(TxnControl::RollbackToSavepoint(name)) if name == "sp1"
360 ));
361 assert!(matches!(
362 expr("VACUUM FULL users"),
363 QueryExpr::MaintenanceCommand(MaintenanceCommand::Vacuum {
364 target: Some(target),
365 full: true,
366 }) if target == "users"
367 ));
368 }
369
370 #[test]
371 fn parse_frontend_routes_extended_schema_sql() {
372 assert!(matches!(
373 expr("CREATE SCHEMA IF NOT EXISTS app"),
374 QueryExpr::CreateSchema(CreateSchemaQuery {
375 name,
376 if_not_exists: true,
377 }) if name == "app"
378 ));
379 assert!(matches!(
380 expr("DROP SCHEMA IF EXISTS app CASCADE"),
381 QueryExpr::DropSchema(DropSchemaQuery {
382 name,
383 if_exists: true,
384 cascade: true,
385 }) if name == "app"
386 ));
387 assert!(matches!(
388 expr("CREATE SEQUENCE IF NOT EXISTS seq START WITH 10 INCREMENT BY 2"),
389 QueryExpr::CreateSequence(CreateSequenceQuery {
390 name,
391 if_not_exists: true,
392 start: 10,
393 increment: 2,
394 }) if name == "seq"
395 ));
396 assert!(matches!(
397 expr("DROP SEQUENCE IF EXISTS seq"),
398 QueryExpr::DropSequence(DropSequenceQuery {
399 name,
400 if_exists: true,
401 }) if name == "seq"
402 ));
403
404 let QueryExpr::CopyFrom(copy) = expr(
405 "COPY users FROM '/tmp/u.csv' WITH (FORMAT = csv, HEADER = true, DELIMITER = ';')",
406 ) else {
407 panic!("COPY should lower through SQL frontend");
408 };
409 assert_eq!(copy.table, "users");
410 assert_eq!(copy.path, "/tmp/u.csv");
411 assert_eq!(copy.format, CopyFormat::Csv);
412 assert_eq!(copy.delimiter, Some(';'));
413 assert!(copy.has_header);
414
415 let QueryExpr::CreateView(view) = expr(
416 "CREATE MATERIALIZED VIEW IF NOT EXISTS mv WITH RETENTION 1 h \
417 AS SELECT id FROM users REFRESH EVERY 5 s",
418 ) else {
419 panic!("CREATE MATERIALIZED VIEW should lower through SQL frontend");
420 };
421 assert_eq!(view.name, "mv");
422 assert!(view.materialized);
423 assert!(view.if_not_exists);
424 assert_eq!(view.retention_duration_ms, Some(3_600_000));
425 assert_eq!(view.refresh_every_ms, Some(5_000));
426 assert!(matches!(*view.query, QueryExpr::Table(_)));
427
428 assert!(matches!(
429 expr("DROP MATERIALIZED VIEW IF EXISTS mv"),
430 QueryExpr::DropView(DropViewQuery {
431 name,
432 materialized: true,
433 if_exists: true,
434 }) if name == "mv"
435 ));
436 assert!(matches!(
437 expr("REFRESH MATERIALIZED VIEW mv"),
438 QueryExpr::RefreshMaterializedView(RefreshMaterializedViewQuery { name }) if name == "mv"
439 ));
440 }
441
442 #[test]
443 fn parse_frontend_routes_fdw_policy_auth_and_migrations() {
444 let QueryExpr::CreateServer(server) = expr(
445 "CREATE SERVER IF NOT EXISTS csvsrv FOREIGN DATA WRAPPER csv OPTIONS (path '/data')",
446 ) else {
447 panic!("CREATE SERVER should lower through SQL frontend");
448 };
449 assert_eq!(server.name, "csvsrv");
450 assert_eq!(server.wrapper, "csv");
451 assert!(server.if_not_exists);
452 assert_eq!(
453 server.options,
454 vec![("path".to_string(), "/data".to_string())]
455 );
456
457 let QueryExpr::CreateForeignTable(table) = expr(
458 "CREATE FOREIGN TABLE IF NOT EXISTS ext_users \
459 (id INT, name TEXT) SERVER csvsrv OPTIONS (file 'users.csv')",
460 ) else {
461 panic!("CREATE FOREIGN TABLE should lower through SQL frontend");
462 };
463 assert_eq!(table.name, "ext_users");
464 assert_eq!(table.server, "csvsrv");
465 assert!(table.if_not_exists);
466 assert_eq!(table.columns.len(), 2);
467 assert!(!table.columns[0].not_null);
468
469 assert!(matches!(
470 expr("DROP SERVER IF EXISTS csvsrv CASCADE"),
471 QueryExpr::DropServer(DropServerQuery {
472 name,
473 if_exists: true,
474 cascade: true,
475 }) if name == "csvsrv"
476 ));
477 assert!(matches!(
478 expr("DROP FOREIGN TABLE IF EXISTS ext_users"),
479 QueryExpr::DropForeignTable(DropForeignTableQuery {
480 name,
481 if_exists: true,
482 }) if name == "ext_users"
483 ));
484
485 let QueryExpr::CreatePolicy(policy) = expr(
486 "CREATE POLICY readonly ON NODES OF mygraph FOR SELECT TO analytics USING (public = 1)",
487 ) else {
488 panic!("CREATE POLICY should lower through SQL frontend");
489 };
490 assert_eq!(policy.name, "readonly");
491 assert_eq!(policy.table, "mygraph");
492 assert_eq!(policy.action, Some(PolicyAction::Select));
493 assert_eq!(policy.role.as_deref(), Some("analytics"));
494 assert_eq!(policy.target_kind.as_ident(), "nodes");
495
496 assert!(matches!(
497 expr("DROP POLICY IF EXISTS readonly ON mygraph"),
498 QueryExpr::DropPolicy(DropPolicyQuery {
499 name,
500 table,
501 if_exists: true,
502 }) if name == "readonly" && table == "mygraph"
503 ));
504
505 assert!(matches!(
506 expr("GRANT SELECT ON TABLE public.users TO tenant1.alice"),
507 QueryExpr::Grant(grant)
508 if grant.actions == vec!["SELECT"]
509 && grant.objects[0].schema.as_deref() == Some("public")
510 ));
511 assert!(matches!(
512 expr("REVOKE GRANT OPTION FOR USAGE ON SCHEMA analytics FROM GROUP analysts"),
513 QueryExpr::Revoke(revoke) if revoke.grant_option_for && revoke.all == false
514 ));
515 assert!(matches!(
516 expr("ALTER USER bob ENABLE SET search_path TO 'public'"),
517 QueryExpr::AlterUser(user)
518 if user.username == "bob" && user.attributes.len() == 2
519 ));
520 assert!(matches!(
521 expr("CREATE USER tenant1.alice WITH PASSWORD 'pw' ROLE write"),
522 QueryExpr::CreateUser(user)
523 if user.tenant.as_deref() == Some("tenant1")
524 && user.username == "alice"
525 && user.password == "pw"
526 && user.role == "write"
527 ));
528
529 assert!(matches!(
530 expr("CREATE POLICY 'readonly' AS '{\"Statement\":[]}'"),
531 QueryExpr::CreateIamPolicy { id, json }
532 if id == "readonly" && json == "{\"Statement\":[]}"
533 ));
534 assert!(matches!(
535 expr("DROP POLICY 'readonly'"),
536 QueryExpr::DropIamPolicy { id } if id == "readonly"
537 ));
538
539 assert!(matches!(
540 expr("CREATE MIGRATION m2 DEPENDS ON m0 BATCH 10 ROWS AS CREATE TABLE accounts (id INT)"),
541 QueryExpr::CreateMigration(migration)
542 if migration.name == "m2"
543 && migration.depends_on == vec!["m0".to_string()]
544 && migration.batch_size == Some(10)
545 ));
546 assert!(matches!(
547 expr("APPLY MIGRATION * FOR TENANT tenant1"),
548 QueryExpr::ApplyMigration(apply)
549 if apply.for_tenant.as_deref() == Some("tenant1")
550 ));
551 assert!(matches!(
552 expr("ROLLBACK MIGRATION m2"),
553 QueryExpr::RollbackMigration(RollbackMigrationQuery { name }) if name == "m2"
554 ));
555 assert!(matches!(
556 expr("EXPLAIN MIGRATION m2"),
557 QueryExpr::ExplainMigration(ExplainMigrationQuery { name }) if name == "m2"
558 ));
559 }
560
561 #[test]
562 fn parse_sql_statement_covers_statement_category_wrapping() {
563 enum Expected {
564 Select,
565 Insert,
566 CreateSchema,
567 SetTenant,
568 }
569
570 let cases = [
571 ("SELECT * FROM users", Expected::Select),
572 ("INSERT INTO users (id) VALUES (1)", Expected::Insert),
573 ("CREATE SCHEMA app", Expected::CreateSchema),
574 ("SET TENANT 'acme'", Expected::SetTenant),
575 ];
576
577 for (input, expected) in cases {
578 let mut parser = Parser::new(input).expect("lexer");
579 let statement = parser
580 .parse_sql_statement()
581 .unwrap_or_else(|err| panic!("failed to parse {input:?}: {err:?}"));
582 let matched = match expected {
583 Expected::Select => matches!(statement, SqlStatement::Query(SqlQuery::Select(_))),
584 Expected::Insert => {
585 matches!(statement, SqlStatement::Mutation(SqlMutation::Insert(_)))
586 }
587 Expected::CreateSchema => matches!(
588 statement,
589 SqlStatement::Schema(SqlSchemaCommand::CreateSchema(_))
590 ),
591 Expected::SetTenant => {
592 matches!(
593 statement,
594 SqlStatement::Admin(SqlAdminCommand::SetTenant(_))
595 )
596 }
597 };
598 assert!(matched, "{input}");
599 }
600 }
601
602 #[test]
603 fn parse_frontend_routes_non_sql_frontends() {
604 let QueryExpr::KvCommand(KvCommand::Get {
605 model,
606 collection,
607 key,
608 }) = expr("KV GET settings.feature")
609 else {
610 panic!("KV GET should route to FrontendStatement::KvCommand");
611 };
612 assert_eq!(model, CollectionModel::Kv);
613 assert_eq!(collection, "settings");
614 assert_eq!(key, "feature");
615
616 let QueryExpr::ConfigCommand(ConfigCommand::Watch {
617 collection,
618 key,
619 prefix,
620 from_lsn,
621 }) = expr("WATCH CONFIG app PREFIX feature FROM LSN 7")
622 else {
623 panic!("WATCH CONFIG should route to FrontendStatement::ConfigCommand");
624 };
625 assert_eq!(collection, "app");
626 assert_eq!(key, "feature");
627 assert!(prefix);
628 assert_eq!(from_lsn, Some(7));
629
630 let QueryExpr::ConfigCommand(ConfigCommand::List {
631 collection,
632 prefix,
633 limit,
634 offset,
635 }) = expr("LIST CONFIG app PREFIX feature LIMIT 3 OFFSET 1")
636 else {
637 panic!("LIST CONFIG should route to FrontendStatement::ConfigCommand");
638 };
639 assert_eq!(collection, "app");
640 assert_eq!(prefix.as_deref(), Some("feature"));
641 assert_eq!(limit, Some(3));
642 assert_eq!(offset, 1);
643
644 let QueryExpr::KvCommand(KvCommand::List {
645 model,
646 collection,
647 prefix,
648 limit,
649 offset,
650 as_json,
651 }) = expr("KV LIST settings PREFIX 'feature.' LIMIT 10 OFFSET 2")
652 else {
653 panic!("KV LIST should route to FrontendStatement::KvCommand");
654 };
655 assert_eq!(model, CollectionModel::Kv);
656 assert_eq!(collection, "settings");
657 assert_eq!(prefix.as_deref(), Some("feature."));
658 assert_eq!(limit, Some(10));
659 assert_eq!(offset, 2);
660 assert!(!as_json);
661
662 let QueryExpr::KvCommand(KvCommand::List {
663 model,
664 collection,
665 prefix,
666 limit,
667 offset,
668 as_json,
669 }) = expr("LIST KV settings PREFIX feature LIMIT 10 OFFSET 2")
670 else {
671 panic!("LIST KV should route to FrontendStatement::KvCommand");
672 };
673 assert_eq!(model, CollectionModel::Kv);
674 assert_eq!(collection, "settings");
675 assert_eq!(prefix.as_deref(), Some("feature"));
676 assert_eq!(limit, Some(10));
677 assert_eq!(offset, 2);
678 assert!(!as_json);
679
680 let QueryExpr::KvCommand(KvCommand::List {
681 model,
682 collection,
683 prefix,
684 as_json,
685 ..
686 }) = expr("KV LIST settings PREFIX feature AS JSON")
687 else {
688 panic!("KV LIST AS JSON should route to FrontendStatement::KvCommand");
689 };
690 assert_eq!(model, CollectionModel::Kv);
691 assert_eq!(collection, "settings");
692 assert_eq!(prefix.as_deref(), Some("feature"));
693 assert!(as_json);
694
695 let QueryExpr::KvCommand(KvCommand::Watch {
696 model,
697 collection,
698 key,
699 prefix,
700 from_lsn,
701 }) = expr("WATCH sessions.user.* FROM LSN 3")
702 else {
703 panic!("bare WATCH should route to FrontendStatement::KvCommand");
704 };
705 assert_eq!(model, CollectionModel::Kv);
706 assert_eq!(collection, "sessions");
707 assert_eq!(key, "user");
708 assert!(prefix);
709 assert_eq!(from_lsn, Some(3));
710
711 let QueryExpr::KvCommand(KvCommand::Watch {
712 model,
713 collection,
714 key,
715 prefix,
716 from_lsn,
717 }) = expr("WATCH VAULT secrets PREFIX api FROM LSN 7")
718 else {
719 panic!("WATCH VAULT should route to FrontendStatement::KvCommand");
720 };
721 assert_eq!(model, CollectionModel::Vault);
722 assert_eq!(collection, "secrets");
723 assert_eq!(key, "api");
724 assert!(prefix);
725 assert_eq!(from_lsn, Some(7));
726
727 let QueryExpr::KvCommand(KvCommand::List {
728 model,
729 collection,
730 prefix,
731 limit,
732 offset,
733 as_json,
734 }) = expr("LIST VAULT secrets PREFIX api LIMIT 10 OFFSET 2")
735 else {
736 panic!("LIST VAULT should route to FrontendStatement::KvCommand");
737 };
738 assert_eq!(model, CollectionModel::Vault);
739 assert_eq!(collection, "secrets");
740 assert_eq!(prefix.as_deref(), Some("api"));
741 assert_eq!(limit, Some(10));
742 assert_eq!(offset, 2);
743 assert!(!as_json);
744
745 assert!(matches!(
746 expr("INVALIDATE CONFIG app feature_flag"),
747 QueryExpr::ConfigCommand(ConfigCommand::InvalidVolatileOperation {
748 operation,
749 collection,
750 key: Some(key),
751 }) if operation == "INVALIDATE" && collection == "app" && key == "feature_flag"
752 ));
753 assert!(matches!(
754 expr("INVALIDATE TAGS [user:42, org:7] FROM sessions"),
755 QueryExpr::KvCommand(KvCommand::InvalidateTags { collection, tags })
756 if collection == "sessions" && tags == vec!["user:42".to_string(), "org:7".to_string()]
757 ));
758
759 let QueryExpr::EventsBackfill(query) =
760 expr("EVENTS BACKFILL users WHERE status = 'active' TO audit LIMIT 10")
761 else {
762 panic!("EVENTS BACKFILL should route to FrontendStatement::EventsBackfill");
763 };
764 assert_eq!(query.collection, "users");
765 assert_eq!(query.where_filter.as_deref(), Some("status = 'active'"));
766 assert_eq!(query.target_queue, "audit");
767 assert_eq!(query.limit, Some(10));
768
769 let QueryExpr::Table(query) = expr("EVENTS STATUS users LIMIT 2") else {
770 panic!("EVENTS STATUS should route through the SQL select surface");
771 };
772 assert_eq!(query.table, "red.subscriptions");
773 assert_eq!(query.limit, Some(2));
774 assert!(query.filter.is_some());
775
776 assert!(matches!(
777 expr("EVENTS BACKFILL STATUS users"),
778 QueryExpr::EventsBackfillStatus { collection } if collection == "users"
779 ));
780 assert!(parse_frontend("LIST UNKNOWN").is_err());
781 assert!(parse_frontend("EVENTS UNKNOWN").is_err());
782 }
783
784 #[test]
785 fn parse_frontend_routes_ranking_reads() {
786 assert!(matches!(
787 expr("RANK OF 42 IN page_rank"),
788 QueryExpr::RankOf(RankOfQuery { ranking, entity_id })
789 if ranking == "page_rank" && entity_id == 42
790 ));
791 assert!(matches!(
792 expr("APPROX RANK OF 7 IN page_rank"),
793 QueryExpr::ApproxRankOf(RankOfQuery { ranking, entity_id })
794 if ranking == "page_rank" && entity_id == 7
795 ));
796 assert!(matches!(
797 expr("RANK RANGE 1 TO 3 IN page_rank"),
798 QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi })
799 if ranking == "page_rank" && lo == 1 && hi == 3
800 ));
801 assert!(matches!(
802 expr("ZRANK page_rank 0"),
803 QueryExpr::RankOf(RankOfQuery { ranking, entity_id })
804 if ranking == "page_rank" && entity_id == 0
805 ));
806 assert!(matches!(
807 expr("ZRANGE page_rank 0 3 WITHSCORES"),
808 QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi })
809 if ranking == "page_rank" && lo == 1 && hi == 4
810 ));
811 assert!(
812 parse_frontend("RANK RANGE 3 TO 1 IN page_rank").is_err(),
813 "rank range must reject reversed bounds"
814 );
815 }
816
817 #[test]
818 fn parse_frontend_covers_multimodel_command_routing() {
819 assert!(matches!(
820 expr("GRAPH CENTRALITY ALGORITHM pagerank LIMIT 5"),
821 QueryExpr::GraphCommand(GraphCommand::Centrality {
822 algorithm,
823 limit: Some(5),
824 ..
825 }) if algorithm == "pagerank"
826 ));
827 assert!(matches!(
828 expr("SEARCH TEXT 'login failure' COLLECTION incidents LIMIT 20 FUZZY"),
829 QueryExpr::SearchCommand(SearchCommand::Text {
830 query,
831 collection: Some(collection),
832 limit: 20,
833 fuzzy: true,
834 ..
835 }) if query == "login failure" && collection == "incidents"
836 ));
837 assert!(matches!(
838 expr("ASK 'why did login fail?' USING openai LIMIT 3"),
839 QueryExpr::Ask(query)
840 if query.question == "why did login fail?"
841 && query.provider.as_deref() == Some("openai")
842 && query.limit == Some(3)
843 ));
844 assert!(matches!(
845 expr("QUEUE LEN tasks"),
846 QueryExpr::QueueCommand(QueueCommand::Len { queue }) if queue == "tasks"
847 ));
848 assert!(matches!(
849 expr("TREE REBALANCE forest.org DRY RUN"),
850 QueryExpr::TreeCommand(TreeCommand::Rebalance {
851 collection,
852 tree_name,
853 dry_run: true,
854 }) if collection == "forest" && tree_name == "org"
855 ));
856 assert!(matches!(
857 expr("HLL COUNT visitors"),
858 QueryExpr::ProbabilisticCommand(ProbabilisticCommand::HllCount { names })
859 if names == vec!["visitors".to_string()]
860 ));
861 }
862
863 #[test]
864 fn sql_command_round_trips_multimodel_schema_variants() {
865 macro_rules! assert_command_round_trip {
866 ($input:expr, $pattern:pat) => {{
867 let command = sql_command($input);
868 assert!(matches!(command, $pattern), "unexpected command for {}", $input);
869
870 let statement = sql_command($input).into_statement();
871 let command = statement.into_command();
872 assert!(
873 matches!(command, $pattern),
874 "statement round trip changed command for {}",
875 $input
876 );
877
878 let expr = sql_command($input).into_query_expr();
879 assert!(
880 !matches!(expr, QueryExpr::Table(TableQuery { table, .. }) if table.is_empty()),
881 "lowering produced an empty table placeholder for {}",
882 $input
883 );
884 }};
885 }
886
887 assert_command_round_trip!(
888 "EXPLAIN ALTER FOR CREATE TABLE users (id INT) FORMAT JSON",
889 SqlCommand::ExplainAlter(_)
890 );
891 assert_command_round_trip!("CREATE TABLE users (id INT)", SqlCommand::CreateTable(_));
892 assert_command_round_trip!("DROP TABLE IF EXISTS users", SqlCommand::DropTable(_));
893 assert_command_round_trip!(
894 "ALTER TABLE users ADD COLUMN status TEXT",
895 SqlCommand::AlterTable(_)
896 );
897 assert_command_round_trip!(
898 "CREATE INDEX idx_email ON users (email) USING HASH",
899 SqlCommand::CreateIndex(_)
900 );
901 assert_command_round_trip!(
902 "DROP INDEX IF EXISTS idx_email ON users",
903 SqlCommand::DropIndex(_)
904 );
905 assert_command_round_trip!("CREATE GRAPH identity", SqlCommand::CreateTable(_));
906 assert_command_round_trip!("CREATE DOCUMENT docs", SqlCommand::CreateTable(_));
907 assert_command_round_trip!(
908 "CREATE VECTOR embeddings DIM 4",
909 SqlCommand::CreateVector(_)
910 );
911 assert_command_round_trip!(
912 "CREATE COLLECTION turbo KIND vector.turbo DIM 3",
913 SqlCommand::CreateCollection(_)
914 );
915 assert_command_round_trip!("CREATE KV settings", SqlCommand::CreateTable(_));
916 assert_command_round_trip!("CREATE CONFIG app", SqlCommand::CreateTable(_));
917 assert_command_round_trip!(
918 "CREATE VAULT secrets WITH OWN MASTER KEY",
919 SqlCommand::CreateTable(_)
920 );
921 assert_command_round_trip!(
922 "CREATE TIMESERIES metrics RETENTION 90 d",
923 SqlCommand::CreateTimeSeries(_)
924 );
925 assert_command_round_trip!(
926 "CREATE METRIC svc.latency TYPE gauge ROLE sli",
927 SqlCommand::CreateMetric(_)
928 );
929 assert_command_round_trip!(
930 "ALTER METRIC svc.latency SET ROLE internal",
931 SqlCommand::AlterMetric(_)
932 );
933 assert_command_round_trip!(
934 "CREATE SLO svc.availability ON svc.latency TARGET 99.9 WINDOW 5 m",
935 SqlCommand::CreateSlo(_)
936 );
937 assert_command_round_trip!(
938 "CREATE QUEUE tasks MAX_SIZE 100",
939 SqlCommand::CreateQueue(_)
940 );
941 assert_command_round_trip!(
942 "ALTER QUEUE tasks SET MODE FANOUT",
943 SqlCommand::AlterQueue(_)
944 );
945 assert_command_round_trip!(
946 "CREATE TREE org IN forest ROOT LABEL root MAX_CHILDREN 4",
947 SqlCommand::CreateTree(_)
948 );
949 assert_command_round_trip!(
950 "CREATE HLL visitors PRECISION 14",
951 SqlCommand::Probabilistic(_)
952 );
953 assert_command_round_trip!(
954 "CREATE SKETCH freqs WIDTH 512 DEPTH 3",
955 SqlCommand::Probabilistic(_)
956 );
957 assert_command_round_trip!(
958 "CREATE FILTER seen CAPACITY 1024",
959 SqlCommand::Probabilistic(_)
960 );
961 assert_command_round_trip!("COPY users FROM '/tmp/u.csv'", SqlCommand::CopyFrom(_));
962 assert_command_round_trip!(
963 "CREATE VIEW active_users AS SELECT * FROM users",
964 SqlCommand::CreateView(_)
965 );
966 assert_command_round_trip!("DROP VIEW active_users", SqlCommand::DropView(_));
967 assert_command_round_trip!(
968 "REFRESH MATERIALIZED VIEW active_users",
969 SqlCommand::RefreshMaterializedView(_)
970 );
971 assert_command_round_trip!(
972 "CREATE SERVER mycsv FOREIGN DATA WRAPPER csv OPTIONS (base_path '/data')",
973 SqlCommand::CreateServer(_)
974 );
975 assert_command_round_trip!(
976 "DROP SERVER IF EXISTS mycsv CASCADE",
977 SqlCommand::DropServer(_)
978 );
979 assert_command_round_trip!(
980 "CREATE FOREIGN TABLE ext_users (id INT, name TEXT) SERVER mycsv OPTIONS (path 'users.csv')",
981 SqlCommand::CreateForeignTable(_)
982 );
983 assert_command_round_trip!(
984 "DROP FOREIGN TABLE IF EXISTS ext_users",
985 SqlCommand::DropForeignTable(_)
986 );
987 }
988
989 #[test]
990 fn sql_command_round_trips_drop_truncate_and_maintenance_variants() {
991 macro_rules! assert_command_round_trip {
992 ($input:expr, $pattern:pat) => {{
993 let command = sql_command($input);
994 assert!(
995 matches!(command, $pattern),
996 "unexpected command for {}",
997 $input
998 );
999 let statement = sql_command($input).into_statement();
1000 assert!(
1001 matches!(statement.into_command(), $pattern),
1002 "statement round trip changed command for {}",
1003 $input
1004 );
1005 }};
1006 }
1007
1008 assert_command_round_trip!("DROP GRAPH IF EXISTS identity", SqlCommand::DropGraph(_));
1009 assert_command_round_trip!(
1010 "DROP VECTOR IF EXISTS embeddings",
1011 SqlCommand::DropVector(_)
1012 );
1013 assert_command_round_trip!("DROP DOCUMENT IF EXISTS docs", SqlCommand::DropDocument(_));
1014 assert_command_round_trip!("DROP KV IF EXISTS settings", SqlCommand::DropKv(_));
1015 assert_command_round_trip!("DROP CONFIG IF EXISTS app", SqlCommand::DropKv(_));
1016 assert_command_round_trip!("DROP VAULT IF EXISTS secrets", SqlCommand::DropKv(_));
1017 assert_command_round_trip!(
1018 "DROP COLLECTION IF EXISTS docs",
1019 SqlCommand::DropCollection(_)
1020 );
1021 assert_command_round_trip!(
1022 "DROP TIMESERIES IF EXISTS metrics",
1023 SqlCommand::DropTimeSeries(_)
1024 );
1025 assert_command_round_trip!(
1026 "DROP HYPERTABLE IF EXISTS metrics",
1027 SqlCommand::DropTimeSeries(_)
1028 );
1029 assert_command_round_trip!("DROP QUEUE IF EXISTS tasks", SqlCommand::DropQueue(_));
1030 assert_command_round_trip!("DROP TREE IF EXISTS org IN forest", SqlCommand::DropTree(_));
1031 assert_command_round_trip!("DROP HLL IF EXISTS visitors", SqlCommand::Probabilistic(_));
1032 assert_command_round_trip!("DROP SKETCH IF EXISTS freqs", SqlCommand::Probabilistic(_));
1033 assert_command_round_trip!("DROP FILTER IF EXISTS seen", SqlCommand::Probabilistic(_));
1034 assert_command_round_trip!(
1035 "TRUNCATE VECTOR IF EXISTS embeddings",
1036 SqlCommand::Truncate(_)
1037 );
1038 assert_command_round_trip!(
1039 "COMMIT WORK",
1040 SqlCommand::TransactionControl(TxnControl::Commit)
1041 );
1042 assert_command_round_trip!(
1043 "ROLLBACK",
1044 SqlCommand::TransactionControl(TxnControl::Rollback)
1045 );
1046 assert_command_round_trip!(
1047 "SAVEPOINT before_batch",
1048 SqlCommand::TransactionControl(TxnControl::Savepoint(_))
1049 );
1050 assert_command_round_trip!(
1051 "RELEASE SAVEPOINT before_batch",
1052 SqlCommand::TransactionControl(TxnControl::ReleaseSavepoint(_))
1053 );
1054 assert_command_round_trip!(
1055 "ANALYZE users",
1056 SqlCommand::Maintenance(MaintenanceCommand::Analyze { .. })
1057 );
1058 assert_command_round_trip!(
1059 "VACUUM",
1060 SqlCommand::Maintenance(MaintenanceCommand::Vacuum { .. })
1061 );
1062 }
1063
1064 #[test]
1065 fn parse_sql_command_covers_show_and_error_branches() {
1066 assert!(matches!(
1067 sql_command("SHOW CREATE TABLE public.users"),
1068 SqlCommand::Select(TableQuery { table, .. }) if table == "red.show_create"
1069 ));
1070 assert!(matches!(
1071 sql_command("SHOW COLLECTIONS INCLUDING INTERNAL LIMIT 2"),
1072 SqlCommand::Select(TableQuery { table, limit: Some(2), .. }) if table == "red.collections"
1073 ));
1074 assert!(matches!(
1075 sql_command("SHOW QUEUES INCLUDING INTERNAL"),
1076 SqlCommand::Select(TableQuery { table, filter: None, .. }) if table == "red.queues"
1077 ));
1078 assert!(matches!(
1079 sql_command("SHOW INDICES ON users"),
1080 SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.show_indexes"
1081 ));
1082 assert!(matches!(
1083 sql_command("SHOW POLICIES ON users WHERE action = 'SELECT'"),
1084 SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.policies"
1085 ));
1086 assert!(matches!(
1087 sql_command("SHOW STATS 'users' WHERE rows > 0"),
1088 SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.stats"
1089 ));
1090 assert!(matches!(
1091 sql_command("SHOW SAMPLE users"),
1092 SqlCommand::Select(TableQuery { table, limit: Some(10), .. }) if table == "users"
1093 ));
1094 assert!(matches!(
1095 sql_command("DESC public.users"),
1096 SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.describe"
1097 ));
1098 assert!(
1099 sql_command_result("CREATE VIEW v WITH RETENTION 1 h AS SELECT * FROM users").is_err()
1100 );
1101 assert!(sql_command_result("CREATE TABLE bad WITH ANALYTICS (centrality)").is_err());
1102 assert!(sql_command_result("BEGIN ISOLATION LEVEL SERIALIZABLE").is_err());
1103 assert!(sql_command_result("EVENTS BACKFILL STATUS users").is_err());
1104 }
1105
1106 #[test]
1107 fn parse_sql_command_covers_remaining_catalog_and_copy_shapes() {
1108 for input in [
1109 "SHOW VECTORS",
1110 "SHOW DOCUMENTS",
1111 "SHOW TIMESERIES",
1112 "SHOW GRAPHS",
1113 "SHOW CONFIGS",
1114 "SHOW VAULTS",
1115 "SHOW KV",
1116 "SHOW SCHEMA public.users",
1117 ] {
1118 assert!(
1119 matches!(sql_command(input), SqlCommand::Select(_)),
1120 "{input}"
1121 );
1122 }
1123
1124 for input in [
1125 "TRUNCATE TABLE users",
1126 "TRUNCATE GRAPH identity",
1127 "TRUNCATE DOCUMENT docs",
1128 "TRUNCATE TIMESERIES metrics",
1129 "TRUNCATE METRICS metrics",
1130 "TRUNCATE KV settings",
1131 "TRUNCATE QUEUE tasks",
1132 "TRUNCATE COLLECTION docs",
1133 ] {
1134 assert!(
1135 matches!(sql_command(input), SqlCommand::Truncate(_)),
1136 "{input}"
1137 );
1138 }
1139 assert!(sql_command_result("TRUNCATE UNKNOWN users").is_err());
1140
1141 let SqlCommand::CopyFrom(copy) = sql_command("COPY users FROM '/tmp/u.csv' WITH (HEADER)")
1142 else {
1143 panic!("expected COPY");
1144 };
1145 assert!(copy.has_header);
1146 assert_eq!(copy.delimiter, None);
1147
1148 let SqlCommand::CopyFrom(copy) =
1149 sql_command("COPY users FROM '/tmp/u.csv' WITH (HEADER = false)")
1150 else {
1151 panic!("expected COPY");
1152 };
1153 assert!(!copy.has_header);
1154
1155 let SqlCommand::CopyFrom(copy) =
1156 sql_command("COPY users FROM '/tmp/u.csv' DELIMITER '|' HEADER")
1157 else {
1158 panic!("expected COPY");
1159 };
1160 assert_eq!(copy.delimiter, Some('|'));
1161 assert!(copy.has_header);
1162 }
1163
1164 #[test]
1165 fn parse_sql_command_covers_remaining_ranking_and_event_errors() {
1166 assert!(matches!(
1167 expr("APPROXIMATE RANK OF 9 IN page_rank"),
1168 QueryExpr::ApproxRankOf(RankOfQuery { ranking, entity_id })
1169 if ranking == "page_rank" && entity_id == 9
1170 ));
1171 assert!(parse_frontend("APPROX OF 7 IN page_rank").is_err());
1172 assert!(parse_frontend("RANK 1 IN page_rank").is_err());
1173 assert!(parse_frontend("RANK RANGE 0 TO 3 IN page_rank").is_err());
1174 assert!(parse_frontend("ZRANK page_rank -1").is_err());
1175 assert!(parse_frontend("ZRANGE page_rank 3 1").is_err());
1176
1177 let QueryExpr::Table(query) = expr("EVENTS STATUS 'users' WHERE active = true") else {
1178 panic!("EVENTS STATUS should accept a quoted collection");
1179 };
1180 assert_eq!(query.table, "red.subscriptions");
1181 assert!(query.filter.is_some());
1182 assert!(query.where_expr.is_some());
1183
1184 let QueryExpr::EventsBackfill(query) = expr("EVENTS BACKFILL users TO audit") else {
1185 panic!("EVENTS BACKFILL should allow omitted filter and limit");
1186 };
1187 assert_eq!(query.collection, "users");
1188 assert_eq!(query.where_filter, None);
1189 assert_eq!(query.limit, None);
1190
1191 assert!(parse_frontend("EVENTS BACKFILL users WHERE TO audit").is_err());
1192 }
1193
1194 #[test]
1195 fn parse_sql_command_covers_analytics_non_goal_rejections() {
1196 for head in ["ANALYTICS", "EVENT", "COHORT", "FUNNEL", "SLA", "ADAPTER"] {
1197 let err = sql_command_result(&format!("CREATE {head} demo"))
1198 .expect_err("analytics v0 non-goal should be rejected");
1199 assert!(err.to_string().contains(&format!("CREATE {head}")), "{err}");
1200 }
1201 }
1202
1203 #[test]
1204 fn parse_sql_command_covers_transaction_isolation_edges() {
1205 for input in [
1206 "BEGIN ISOLATION LEVEL READ UNCOMMITTED",
1207 "BEGIN ISOLATION LEVEL READ COMMITTED",
1208 "BEGIN ISOLATION LEVEL REPEATABLE READ",
1209 "START TRANSACTION ISOLATION LEVEL SNAPSHOT",
1210 ] {
1211 assert!(
1212 matches!(
1213 sql_command(input),
1214 SqlCommand::TransactionControl(TxnControl::Begin)
1215 ),
1216 "{input}"
1217 );
1218 }
1219
1220 assert!(sql_command_result("BEGIN ISOLATION LEVEL READ").is_err());
1221 assert!(sql_command_result("BEGIN ISOLATION LEVEL REPEATABLE").is_err());
1222 assert!(sql_command_result("BEGIN ISOLATION LEVEL CHAOS").is_err());
1223 }
1224
1225 #[test]
1226 fn parse_sql_command_covers_iam_and_hypertable_dispatch_edges() {
1227 assert!(matches!(
1228 expr("CREATE HYPERTABLE metrics TIME_COLUMN ts CHUNK_INTERVAL '1d'"),
1229 QueryExpr::CreateTimeSeries(query)
1230 if query.name == "metrics" && query.hypertable.is_some()
1231 ));
1232 assert!(sql_command_result("CREATE OR TABLE bad (id INT)").is_err());
1233 assert!(sql_command_result("DROP MATERIALIZED TABLE bad").is_err());
1234
1235 assert!(matches!(
1236 expr("ATTACH POLICY 'readonly' TO USER tenant1.alice"),
1237 QueryExpr::AttachPolicy { policy_id, .. } if policy_id == "readonly"
1238 ));
1239 assert!(matches!(
1240 expr("DETACH POLICY 'readonly' FROM GROUP analysts"),
1241 QueryExpr::DetachPolicy { policy_id, .. } if policy_id == "readonly"
1242 ));
1243 assert!(matches!(
1244 expr("SHOW POLICIES FOR USER alice"),
1245 QueryExpr::ShowPolicies { filter: Some(_) }
1246 ));
1247 assert!(matches!(
1248 expr("SHOW EFFECTIVE PERMISSIONS FOR alice"),
1249 QueryExpr::ShowEffectivePermissions { resource: None, .. }
1250 ));
1251 assert!(matches!(
1252 expr("SIMULATE alice ACTION 'iam:PassRole' ON TABLE:public.orders"),
1253 QueryExpr::SimulatePolicy { action, .. } if action == "iam:PassRole"
1254 ));
1255 assert!(matches!(
1256 expr("LINT POLICY JSON '{\"Statement\":[]}'"),
1257 QueryExpr::LintPolicy { .. }
1258 ));
1259 assert!(matches!(
1260 expr("MIGRATE POLICY MODE TO 'policy_only' DRY RUN"),
1261 QueryExpr::MigratePolicyMode {
1262 target,
1263 dry_run: true,
1264 } if target == "policy_only"
1265 ));
1266 assert!(parse_frontend("MIGRATE OTHER").is_err());
1267 }
1268
1269 #[test]
1270 fn parse_frontend_rejects_trailing_tokens() {
1271 let err = parse_frontend("SET TENANT 'acme' junk")
1272 .expect_err("parse_frontend should reject trailing tokens");
1273 assert!(
1274 err.to_string().contains("Unexpected token after query"),
1275 "{err}"
1276 );
1277 }
1278}
1279
1280fn add_table_filter(query: &mut TableQuery, filter: Filter) {
1281 let combined = match query.filter.take() {
1282 Some(existing) => existing.and(filter),
1283 None => filter,
1284 };
1285 query.where_expr = Some(filter_to_expr(&combined));
1286 query.filter = Some(combined);
1287}
1288
1289fn parse_show_collections_by_model(
1290 parser: &mut Parser<'_>,
1291 model: &str,
1292) -> Result<TableQuery, ParseError> {
1293 let mut query = TableQuery::new("red.collections");
1294 parser.parse_table_clauses(&mut query)?;
1295 add_table_filter(&mut query, collection_model_filter(model));
1296 Ok(query)
1297}
1298
1299#[derive(Debug, Clone)]
1300#[allow(clippy::large_enum_variant)]
1301pub enum SqlQuery {
1302 Select(TableQuery),
1303 Join(JoinQuery),
1304}
1305
1306#[derive(Debug, Clone)]
1307pub enum SqlMutation {
1308 Insert(InsertQuery),
1309 Update(UpdateQuery),
1310 Delete(DeleteQuery),
1311}
1312
1313#[derive(Debug, Clone)]
1314pub enum SqlSchemaCommand {
1315 ExplainAlter(ExplainAlterQuery),
1316 CreateTable(CreateTableQuery),
1317 CreateCollection(CreateCollectionQuery),
1318 CreateVector(CreateVectorQuery),
1319 DropTable(DropTableQuery),
1320 DropGraph(DropGraphQuery),
1321 DropVector(DropVectorQuery),
1322 DropDocument(DropDocumentQuery),
1323 DropKv(DropKvQuery),
1324 DropCollection(DropCollectionQuery),
1325 Truncate(TruncateQuery),
1326 AlterTable(AlterTableQuery),
1327 CreateIndex(CreateIndexQuery),
1328 DropIndex(DropIndexQuery),
1329 CreateTimeSeries(CreateTimeSeriesQuery),
1330 CreateMetric(CreateMetricQuery),
1331 AlterMetric(AlterMetricQuery),
1332 CreateSlo(CreateSloQuery),
1333 DropTimeSeries(DropTimeSeriesQuery),
1334 CreateQueue(CreateQueueQuery),
1335 AlterQueue(AlterQueueQuery),
1336 DropQueue(DropQueueQuery),
1337 CreateTree(CreateTreeQuery),
1338 DropTree(DropTreeQuery),
1339 Probabilistic(ProbabilisticCommand),
1340 CreateSchema(CreateSchemaQuery),
1341 DropSchema(DropSchemaQuery),
1342 CreateSequence(CreateSequenceQuery),
1343 DropSequence(DropSequenceQuery),
1344 CopyFrom(CopyFromQuery),
1345 CreateView(CreateViewQuery),
1346 DropView(DropViewQuery),
1347 RefreshMaterializedView(RefreshMaterializedViewQuery),
1348 CreatePolicy(CreatePolicyQuery),
1349 DropPolicy(DropPolicyQuery),
1350 CreateServer(CreateServerQuery),
1351 DropServer(DropServerQuery),
1352 CreateForeignTable(CreateForeignTableQuery),
1353 DropForeignTable(DropForeignTableQuery),
1354 CreateMigration(CreateMigrationQuery),
1355 ApplyMigration(ApplyMigrationQuery),
1356 RollbackMigration(RollbackMigrationQuery),
1357 ExplainMigration(ExplainMigrationQuery),
1358}
1359
1360#[derive(Debug, Clone)]
1361#[allow(clippy::large_enum_variant)]
1362pub enum SqlAdminCommand {
1363 SetConfig {
1364 key: String,
1365 value: Value,
1366 },
1367 ShowConfig {
1368 prefix: Option<String>,
1369 as_json: bool,
1370 },
1371 SetSecret {
1372 key: String,
1373 value: Value,
1374 },
1375 DeleteSecret {
1376 key: String,
1377 },
1378 ShowSecrets {
1379 prefix: Option<String>,
1380 },
1381 SetTenant(Option<String>),
1382 ShowTenant,
1383 TransactionControl(TxnControl),
1384 Maintenance(MaintenanceCommand),
1385 Grant(GrantStmt),
1386 Revoke(RevokeStmt),
1387 AlterUser(AlterUserStmt),
1388 CreateUser(CreateUserStmt),
1389 IamPolicy(QueryExpr),
1390}
1391
1392impl SqlStatement {
1393 pub fn into_command(self) -> SqlCommand {
1394 match self {
1395 SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
1396 SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
1397 SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
1398 SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
1399 SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
1400 SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
1401 SqlCommand::ExplainAlter(query)
1402 }
1403 SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
1404 SqlCommand::CreateTable(query)
1405 }
1406 SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
1407 SqlCommand::CreateCollection(query)
1408 }
1409 SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
1410 SqlCommand::CreateVector(query)
1411 }
1412 SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
1413 SqlCommand::DropTable(query)
1414 }
1415 SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
1416 SqlCommand::DropGraph(query)
1417 }
1418 SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
1419 SqlCommand::DropVector(query)
1420 }
1421 SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
1422 SqlCommand::DropDocument(query)
1423 }
1424 SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
1425 SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
1426 SqlCommand::DropCollection(query)
1427 }
1428 SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
1429 SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
1430 SqlCommand::AlterTable(query)
1431 }
1432 SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
1433 SqlCommand::CreateIndex(query)
1434 }
1435 SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
1436 SqlCommand::DropIndex(query)
1437 }
1438 SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
1439 SqlCommand::CreateTimeSeries(query)
1440 }
1441 SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query)) => {
1442 SqlCommand::CreateMetric(query)
1443 }
1444 SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query)) => {
1445 SqlCommand::AlterMetric(query)
1446 }
1447 SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query)) => {
1448 SqlCommand::CreateSlo(query)
1449 }
1450 SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
1451 SqlCommand::DropTimeSeries(query)
1452 }
1453 SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
1454 SqlCommand::CreateQueue(query)
1455 }
1456 SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
1457 SqlCommand::AlterQueue(query)
1458 }
1459 SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
1460 SqlCommand::DropQueue(query)
1461 }
1462 SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
1463 SqlCommand::CreateTree(query)
1464 }
1465 SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
1466 SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
1467 SqlCommand::Probabilistic(command)
1468 }
1469 SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
1470 SqlCommand::SetConfig { key, value }
1471 }
1472 SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix, as_json }) => {
1473 SqlCommand::ShowConfig { prefix, as_json }
1474 }
1475 SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
1476 SqlCommand::SetSecret { key, value }
1477 }
1478 SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
1479 SqlCommand::DeleteSecret { key }
1480 }
1481 SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
1482 SqlCommand::ShowSecrets { prefix }
1483 }
1484 SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
1485 SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
1486 SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
1487 SqlCommand::TransactionControl(ctl)
1488 }
1489 SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
1490 SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
1491 SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
1492 SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
1493 SqlCommand::CreateSequence(q)
1494 }
1495 SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
1496 SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
1497 SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
1498 SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
1499 SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
1500 SqlCommand::RefreshMaterializedView(q)
1501 }
1502 SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
1503 SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
1504 SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
1505 SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
1506 SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
1507 SqlCommand::CreateForeignTable(q)
1508 }
1509 SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
1510 SqlCommand::DropForeignTable(q)
1511 }
1512 SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
1513 SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
1514 SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
1515 SqlStatement::Admin(SqlAdminCommand::CreateUser(s)) => SqlCommand::CreateUser(s),
1516 SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
1517 SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
1518 SqlCommand::CreateMigration(q)
1519 }
1520 SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
1521 SqlCommand::ApplyMigration(q)
1522 }
1523 SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
1524 SqlCommand::RollbackMigration(q)
1525 }
1526 SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
1527 SqlCommand::ExplainMigration(q)
1528 }
1529 }
1530 }
1531
1532 pub fn into_query_expr(self) -> QueryExpr {
1533 self.into_command().into_query_expr()
1534 }
1535}
1536
1537impl FrontendStatement {
1538 pub fn into_query_expr(self) -> QueryExpr {
1539 match self {
1540 FrontendStatement::Sql(statement) => statement.into_query_expr(),
1541 FrontendStatement::Graph(query) => QueryExpr::Graph(query),
1542 FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
1543 FrontendStatement::Path(query) => QueryExpr::Path(query),
1544 FrontendStatement::Vector(query) => QueryExpr::Vector(query),
1545 FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
1546 FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
1547 FrontendStatement::Ask(query) => QueryExpr::Ask(query),
1548 FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
1549 FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
1550 FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
1551 FrontendStatement::EventsBackfillStatus { collection } => {
1552 QueryExpr::EventsBackfillStatus { collection }
1553 }
1554 FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
1555 FrontendStatement::ProbabilisticCommand(command) => {
1556 QueryExpr::ProbabilisticCommand(command)
1557 }
1558 FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
1559 FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
1560 FrontendStatement::Ranking(expr) => expr,
1561 }
1562 }
1563}
1564
1565pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
1566 let mut parser = Parser::new(input)?;
1567 let statement = parser.parse_frontend_statement()?;
1568 if !parser.check(&Token::Eof) {
1569 return Err(ParseError::new(
1570 format!("Unexpected token after query: {:?}", parser.current.token),
1575 parser.position(),
1576 ));
1577 }
1578 Ok(statement)
1579}
1580
1581impl SqlCommand {
1582 pub fn into_query_expr(self) -> QueryExpr {
1583 match self {
1584 SqlCommand::Select(query) => QueryExpr::Table(query),
1585 SqlCommand::Join(query) => QueryExpr::Join(query),
1586 SqlCommand::Insert(query) => QueryExpr::Insert(query),
1587 SqlCommand::Update(query) => QueryExpr::Update(query),
1588 SqlCommand::Delete(query) => QueryExpr::Delete(query),
1589 SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
1590 SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
1591 SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
1592 SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
1593 SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
1594 SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
1595 SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
1596 SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
1597 SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
1598 SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
1599 SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
1600 SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
1601 SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
1602 SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
1603 SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
1604 SqlCommand::CreateMetric(query) => QueryExpr::CreateMetric(query),
1605 SqlCommand::AlterMetric(query) => QueryExpr::AlterMetric(query),
1606 SqlCommand::CreateSlo(query) => QueryExpr::CreateSlo(query),
1607 SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
1608 SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
1609 SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
1610 SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
1611 SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
1612 SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
1613 SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
1614 SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
1615 SqlCommand::ShowConfig { prefix, as_json } => QueryExpr::ShowConfig { prefix, as_json },
1616 SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
1617 SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
1618 SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
1619 SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
1620 SqlCommand::ShowTenant => QueryExpr::ShowTenant,
1621 SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
1622 SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
1623 SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
1624 SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
1625 SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
1626 SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
1627 SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
1628 SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
1629 SqlCommand::DropView(q) => QueryExpr::DropView(q),
1630 SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
1631 SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
1632 SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
1633 SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
1634 SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
1635 SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
1636 SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
1637 SqlCommand::Grant(s) => QueryExpr::Grant(s),
1638 SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
1639 SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
1640 SqlCommand::CreateUser(s) => QueryExpr::CreateUser(s),
1641 SqlCommand::IamPolicy(e) => e,
1642 SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
1643 SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
1644 SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
1645 SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
1646 }
1647 }
1648
1649 pub fn into_statement(self) -> SqlStatement {
1650 match self {
1651 SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
1652 SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
1653 SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
1654 SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
1655 SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
1656 SqlCommand::ExplainAlter(query) => {
1657 SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
1658 }
1659 SqlCommand::CreateTable(query) => {
1660 SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
1661 }
1662 SqlCommand::CreateCollection(query) => {
1663 SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
1664 }
1665 SqlCommand::CreateVector(query) => {
1666 SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
1667 }
1668 SqlCommand::DropTable(query) => {
1669 SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
1670 }
1671 SqlCommand::DropGraph(query) => {
1672 SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
1673 }
1674 SqlCommand::DropVector(query) => {
1675 SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
1676 }
1677 SqlCommand::DropDocument(query) => {
1678 SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
1679 }
1680 SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
1681 SqlCommand::DropCollection(query) => {
1682 SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
1683 }
1684 SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
1685 SqlCommand::AlterTable(query) => {
1686 SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
1687 }
1688 SqlCommand::CreateIndex(query) => {
1689 SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
1690 }
1691 SqlCommand::DropIndex(query) => {
1692 SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
1693 }
1694 SqlCommand::CreateTimeSeries(query) => {
1695 SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
1696 }
1697 SqlCommand::CreateMetric(query) => {
1698 SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query))
1699 }
1700 SqlCommand::AlterMetric(query) => {
1701 SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query))
1702 }
1703 SqlCommand::CreateSlo(query) => {
1704 SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query))
1705 }
1706 SqlCommand::DropTimeSeries(query) => {
1707 SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
1708 }
1709 SqlCommand::CreateQueue(query) => {
1710 SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
1711 }
1712 SqlCommand::AlterQueue(query) => {
1713 SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
1714 }
1715 SqlCommand::DropQueue(query) => {
1716 SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
1717 }
1718 SqlCommand::CreateTree(query) => {
1719 SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
1720 }
1721 SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
1722 SqlCommand::Probabilistic(command) => {
1723 SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
1724 }
1725 SqlCommand::SetConfig { key, value } => {
1726 SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
1727 }
1728 SqlCommand::ShowConfig { prefix, as_json } => {
1729 SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix, as_json })
1730 }
1731 SqlCommand::SetSecret { key, value } => {
1732 SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
1733 }
1734 SqlCommand::DeleteSecret { key } => {
1735 SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
1736 }
1737 SqlCommand::ShowSecrets { prefix } => {
1738 SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
1739 }
1740 SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
1741 SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
1742 SqlCommand::TransactionControl(ctl) => {
1743 SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
1744 }
1745 SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
1746 SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
1747 SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
1748 SqlCommand::CreateSequence(q) => {
1749 SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
1750 }
1751 SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
1752 SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
1753 SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
1754 SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
1755 SqlCommand::RefreshMaterializedView(q) => {
1756 SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
1757 }
1758 SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
1759 SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
1760 SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
1761 SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
1762 SqlCommand::CreateForeignTable(q) => {
1763 SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
1764 }
1765 SqlCommand::DropForeignTable(q) => {
1766 SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
1767 }
1768 SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
1769 SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
1770 SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
1771 SqlCommand::CreateUser(s) => SqlStatement::Admin(SqlAdminCommand::CreateUser(s)),
1772 SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
1773 SqlCommand::CreateMigration(q) => {
1774 SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
1775 }
1776 SqlCommand::ApplyMigration(q) => {
1777 SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
1778 }
1779 SqlCommand::RollbackMigration(q) => {
1780 SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
1781 }
1782 SqlCommand::ExplainMigration(q) => {
1783 SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
1784 }
1785 }
1786 }
1787}
1788
1789impl<'a> Parser<'a> {
1790 fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
1791 self.expect_ident()?; if self.consume_ident_ci("STATUS")? {
1793 let mut query = TableQuery::new("red.subscriptions");
1794 let collection = match self.peek().clone() {
1795 Token::Ident(name) => {
1796 self.advance()?;
1797 Some(name)
1798 }
1799 Token::String(name) => {
1800 self.advance()?;
1801 Some(name)
1802 }
1803 _ => None,
1804 };
1805 self.parse_table_clauses(&mut query)?;
1806 if let Some(collection) = collection {
1807 let filter = Filter::compare(
1808 FieldRef::column("red.subscriptions", "collection"),
1809 CompareOp::Eq,
1810 Value::text(collection),
1811 );
1812 let expr = filter_to_expr(&filter);
1813 query.where_expr = Some(match query.where_expr.take() {
1814 Some(existing) => Expr::binop(BinOp::And, existing, expr),
1815 None => expr,
1816 });
1817 query.filter = Some(match query.filter.take() {
1818 Some(existing) => existing.and(filter),
1819 None => filter,
1820 });
1821 }
1822 return Ok(QueryExpr::Table(query));
1823 }
1824
1825 if !self.consume_ident_ci("BACKFILL")? {
1826 return Err(ParseError::expected(
1827 vec!["BACKFILL", "STATUS"],
1828 self.peek(),
1829 self.position(),
1830 ));
1831 }
1832
1833 if self.consume_ident_ci("STATUS")? {
1834 let collection = self.expect_ident()?;
1835 return Ok(QueryExpr::EventsBackfillStatus { collection });
1836 }
1837
1838 let collection = self.expect_ident()?;
1839 let where_filter = if self.consume(&Token::Where)? {
1840 let mut parts = Vec::new();
1841 while !self.check(&Token::Eof) && !self.check(&Token::To) {
1842 parts.push(self.peek().to_string());
1843 self.advance()?;
1844 }
1845 if parts.is_empty() {
1846 return Err(ParseError::expected(
1847 vec!["predicate"],
1848 self.peek(),
1849 self.position(),
1850 ));
1851 }
1852 Some(parts.join(" "))
1853 } else {
1854 None
1855 };
1856
1857 self.expect(Token::To)?;
1858 let target_queue = self.expect_ident()?;
1859 let limit = if self.consume(&Token::Limit)? {
1860 Some(self.parse_positive_integer("LIMIT")? as u64)
1861 } else {
1862 None
1863 };
1864
1865 Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
1866 collection,
1867 where_filter,
1868 target_queue,
1869 limit,
1870 }))
1871 }
1872
1873 pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
1878 if !self.consume(&Token::Options)? {
1879 return Ok(Vec::new());
1880 }
1881 self.expect(Token::LParen)?;
1882 let mut out: Vec<(String, String)> = Vec::new();
1883 loop {
1884 let was_ident = matches!(self.peek(), Token::Ident(_));
1889 let raw = self.expect_ident_or_keyword()?;
1890 let key = if was_ident {
1891 raw
1892 } else {
1893 raw.to_ascii_lowercase()
1894 };
1895 let value = self.parse_string()?;
1897 out.push((key, value));
1898 if !self.consume(&Token::Comma)? {
1899 break;
1900 }
1901 }
1902 self.expect(Token::RParen)?;
1903 Ok(out)
1904 }
1905
1906 pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
1908 match self.peek() {
1909 Token::Select => match self.parse_select_query()? {
1910 QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1911 SqlQuery::Select(query),
1912 ))),
1913 QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1914 SqlQuery::Join(query),
1915 ))),
1916 QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
1917 other => Err(ParseError::new(
1918 format!("internal: SELECT produced unexpected query kind {other:?}"),
1919 self.position(),
1920 )),
1921 },
1922 Token::From
1923 | Token::Insert
1924 | Token::Update
1925 | Token::Truncate
1926 | Token::Create
1927 | Token::Drop
1928 | Token::Alter
1929 | Token::Set
1930 | Token::Begin
1931 | Token::Commit
1932 | Token::Rollback
1933 | Token::Savepoint
1934 | Token::Release
1935 | Token::Start
1936 | Token::Vacuum
1937 | Token::Analyze
1938 | Token::Copy
1939 | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
1940 Token::Explain => {
1941 if matches!(
1942 self.peek_next()?,
1943 Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
1944 ) {
1945 match self.parse_explain_ask_query()? {
1946 QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
1947 other => Err(ParseError::new(
1948 format!(
1949 "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
1950 ),
1951 self.position(),
1952 )),
1953 }
1954 } else {
1955 self.parse_sql_statement().map(FrontendStatement::Sql)
1956 }
1957 }
1958 Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
1959 self.parse_sql_statement().map(FrontendStatement::Sql)
1960 }
1961 Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
1962 self.parse_sql_statement().map(FrontendStatement::Sql)
1963 }
1964 Token::Ident(name)
1965 if name.eq_ignore_ascii_case("RANK")
1966 || name.eq_ignore_ascii_case("APPROX")
1967 || name.eq_ignore_ascii_case("APPROXIMATE")
1968 || name.eq_ignore_ascii_case("ZRANK")
1969 || name.eq_ignore_ascii_case("ZRANGE") =>
1970 {
1971 self.parse_ranking_read().map(FrontendStatement::Ranking)
1972 }
1973 Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
1974 Token::Ident(name)
1975 if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
1976 {
1977 self.parse_sql_statement().map(FrontendStatement::Sql)
1978 }
1979 Token::Ident(name)
1980 if name.eq_ignore_ascii_case("GRANT")
1981 || name.eq_ignore_ascii_case("REVOKE")
1982 || name.eq_ignore_ascii_case("SIMULATE")
1983 || name.eq_ignore_ascii_case("LINT")
1984 || name.eq_ignore_ascii_case("MIGRATE")
1985 || name.eq_ignore_ascii_case("APPLY") =>
1986 {
1987 self.parse_sql_statement().map(FrontendStatement::Sql)
1988 }
1989 Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
1990 self.advance()?;
1991 if matches!(
1992 self.peek(),
1993 Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1994 ) {
1995 match self.parse_config_watch_after_watch()? {
1996 QueryExpr::ConfigCommand(command) => {
1997 Ok(FrontendStatement::ConfigCommand(command))
1998 }
1999 other => Err(ParseError::new(
2000 format!(
2001 "internal: WATCH CONFIG produced unexpected query kind {other:?}"
2002 ),
2003 self.position(),
2004 )),
2005 }
2006 } else if matches!(
2007 self.peek(),
2008 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
2009 ) {
2010 match self.parse_vault_watch_after_watch()? {
2011 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2012 other => Err(ParseError::new(
2013 format!(
2014 "internal: WATCH VAULT produced unexpected query kind {other:?}"
2015 ),
2016 self.position(),
2017 )),
2018 }
2019 } else {
2020 match self.parse_kv_watch(reddb_types::catalog::CollectionModel::Kv)? {
2021 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2022 other => Err(ParseError::new(
2023 format!("internal: WATCH produced unexpected query kind {other:?}"),
2024 self.position(),
2025 )),
2026 }
2027 }
2028 }
2029 Token::List => {
2030 self.advance()?;
2031 if matches!(
2032 self.peek(),
2033 Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
2034 ) {
2035 match self.parse_config_list_after_list()? {
2036 QueryExpr::ConfigCommand(command) => {
2037 Ok(FrontendStatement::ConfigCommand(command))
2038 }
2039 other => Err(ParseError::new(
2040 format!(
2041 "internal: LIST CONFIG produced unexpected query kind {other:?}"
2042 ),
2043 self.position(),
2044 )),
2045 }
2046 } else if matches!(self.peek(), Token::Kv) {
2047 match self.parse_kv_list_after_list()? {
2048 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2049 other => Err(ParseError::new(
2050 format!("internal: LIST KV produced unexpected query kind {other:?}"),
2051 self.position(),
2052 )),
2053 }
2054 } else if matches!(
2055 self.peek(),
2056 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
2057 ) {
2058 match self.parse_vault_list_after_list()? {
2059 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2060 other => Err(ParseError::new(
2061 format!(
2062 "internal: LIST VAULT produced unexpected query kind {other:?}"
2063 ),
2064 self.position(),
2065 )),
2066 }
2067 } else {
2068 Err(ParseError::expected(
2069 vec!["CONFIG", "KV", "VAULT"],
2070 self.peek(),
2071 self.position(),
2072 ))
2073 }
2074 }
2075 Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
2076 self.advance()?;
2077 if matches!(
2078 self.peek(),
2079 Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
2080 ) {
2081 match self.parse_config_list_after_list()? {
2082 QueryExpr::ConfigCommand(command) => {
2083 Ok(FrontendStatement::ConfigCommand(command))
2084 }
2085 other => Err(ParseError::new(
2086 format!(
2087 "internal: LIST CONFIG produced unexpected query kind {other:?}"
2088 ),
2089 self.position(),
2090 )),
2091 }
2092 } else if matches!(self.peek(), Token::Kv) {
2093 match self.parse_kv_list_after_list()? {
2094 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2095 other => Err(ParseError::new(
2096 format!("internal: LIST KV produced unexpected query kind {other:?}"),
2097 self.position(),
2098 )),
2099 }
2100 } else if matches!(
2101 self.peek(),
2102 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
2103 ) {
2104 match self.parse_vault_list_after_list()? {
2105 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2106 other => Err(ParseError::new(
2107 format!(
2108 "internal: LIST VAULT produced unexpected query kind {other:?}"
2109 ),
2110 self.position(),
2111 )),
2112 }
2113 } else {
2114 Err(ParseError::expected(
2115 vec!["CONFIG", "KV", "VAULT"],
2116 self.peek(),
2117 self.position(),
2118 ))
2119 }
2120 }
2121 Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
2122 if matches!(
2123 self.peek_next()?,
2124 Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
2125 ) {
2126 match self.parse_config_command()? {
2127 QueryExpr::ConfigCommand(command) => {
2128 Ok(FrontendStatement::ConfigCommand(command))
2129 }
2130 other => Err(ParseError::new(
2131 format!("internal: CONFIG produced unexpected query kind {other:?}"),
2132 self.position(),
2133 )),
2134 }
2135 } else {
2136 self.advance()?;
2137 match self.parse_kv_invalidate_tags_after_invalidate()? {
2138 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2139 other => Err(ParseError::new(
2140 format!(
2141 "internal: INVALIDATE produced unexpected query kind {other:?}"
2142 ),
2143 self.position(),
2144 )),
2145 }
2146 }
2147 }
2148 Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
2149 Token::Match => match self.parse_match_query()? {
2150 QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
2151 other => Err(ParseError::new(
2152 format!("internal: MATCH produced unexpected query kind {other:?}"),
2153 self.position(),
2154 )),
2155 },
2156 Token::Path => match self.parse_path_query()? {
2157 QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
2158 other => Err(ParseError::new(
2159 format!("internal: PATH produced unexpected query kind {other:?}"),
2160 self.position(),
2161 )),
2162 },
2163 Token::Vector => match self.parse_vector_query()? {
2164 QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
2165 other => Err(ParseError::new(
2166 format!("internal: VECTOR produced unexpected query kind {other:?}"),
2167 self.position(),
2168 )),
2169 },
2170 Token::Hybrid => match self.parse_hybrid_query()? {
2171 QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
2172 other => Err(ParseError::new(
2173 format!("internal: HYBRID produced unexpected query kind {other:?}"),
2174 self.position(),
2175 )),
2176 },
2177 Token::Graph => match self.parse_graph_command()? {
2178 QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
2179 other => Err(ParseError::new(
2180 format!("internal: GRAPH produced unexpected query kind {other:?}"),
2181 self.position(),
2182 )),
2183 },
2184 Token::Search => match self.parse_search_command()? {
2185 QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
2186 other => Err(ParseError::new(
2187 format!("internal: SEARCH produced unexpected query kind {other:?}"),
2188 self.position(),
2189 )),
2190 },
2191 Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
2192 match self.parse_ask_query()? {
2193 QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
2194 other => Err(ParseError::new(
2195 format!("internal: ASK produced unexpected query kind {other:?}"),
2196 self.position(),
2197 )),
2198 }
2199 }
2200 Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
2201 match self.parse_unseal_vault_command()? {
2202 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2203 other => Err(ParseError::new(
2204 format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
2205 self.position(),
2206 )),
2207 }
2208 }
2209 Token::Queue => match self.parse_queue_command()? {
2210 QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
2211 other => Err(ParseError::new(
2212 format!("internal: QUEUE produced unexpected query kind {other:?}"),
2213 self.position(),
2214 )),
2215 },
2216 Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2217 match self.parse_events_command()? {
2218 QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
2219 SqlQuery::Select(query),
2220 ))),
2221 QueryExpr::EventsBackfill(query) => {
2222 Ok(FrontendStatement::EventsBackfill(query))
2223 }
2224 QueryExpr::EventsBackfillStatus { collection } => {
2225 Ok(FrontendStatement::EventsBackfillStatus { collection })
2226 }
2227 other => Err(ParseError::new(
2228 format!("internal: EVENTS produced unexpected query kind {other:?}"),
2229 self.position(),
2230 )),
2231 }
2232 }
2233 Token::Kv => match self.parse_kv_command()? {
2234 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2235 other => Err(ParseError::new(
2236 format!("internal: KV produced unexpected query kind {other:?}"),
2237 self.position(),
2238 )),
2239 },
2240 Token::Delete => {
2241 if matches!(
2242 self.peek_next()?,
2243 Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
2244 ) {
2245 match self.parse_config_command()? {
2246 QueryExpr::ConfigCommand(command) => {
2247 Ok(FrontendStatement::ConfigCommand(command))
2248 }
2249 other => Err(ParseError::new(
2250 format!("internal: CONFIG produced unexpected query kind {other:?}"),
2251 self.position(),
2252 )),
2253 }
2254 } else if matches!(
2255 self.peek_next()?,
2256 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
2257 ) {
2258 match self.parse_vault_lifecycle_command()? {
2259 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2260 other => Err(ParseError::new(
2261 format!("internal: VAULT produced unexpected query kind {other:?}"),
2262 self.position(),
2263 )),
2264 }
2265 } else {
2266 self.parse_sql_statement().map(FrontendStatement::Sql)
2267 }
2268 }
2269 Token::Add => match self.parse_config_command()? {
2270 QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
2271 other => Err(ParseError::new(
2272 format!("internal: CONFIG produced unexpected query kind {other:?}"),
2273 self.position(),
2274 )),
2275 },
2276 Token::Purge => match self.parse_vault_lifecycle_command()? {
2277 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2278 other => Err(ParseError::new(
2279 format!("internal: VAULT produced unexpected query kind {other:?}"),
2280 self.position(),
2281 )),
2282 },
2283 Token::Ident(name)
2284 if name.eq_ignore_ascii_case("PUT")
2285 || name.eq_ignore_ascii_case("GET")
2286 || name.eq_ignore_ascii_case("RESOLVE")
2287 || name.eq_ignore_ascii_case("ROTATE")
2288 || name.eq_ignore_ascii_case("HISTORY")
2289 || name.eq_ignore_ascii_case("PURGE")
2290 || name.eq_ignore_ascii_case("INCR")
2291 || name.eq_ignore_ascii_case("DECR")
2292 || name.eq_ignore_ascii_case("INVALIDATE") =>
2293 {
2294 if matches!(
2295 self.peek_next()?,
2296 Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
2297 ) {
2298 match self.parse_vault_lifecycle_command()? {
2299 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2300 other => Err(ParseError::new(
2301 format!("internal: VAULT produced unexpected query kind {other:?}"),
2302 self.position(),
2303 )),
2304 }
2305 } else {
2306 match self.parse_config_command()? {
2307 QueryExpr::ConfigCommand(command) => {
2308 Ok(FrontendStatement::ConfigCommand(command))
2309 }
2310 other => Err(ParseError::new(
2311 format!("internal: CONFIG produced unexpected query kind {other:?}"),
2312 self.position(),
2313 )),
2314 }
2315 }
2316 }
2317 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
2318 match self.parse_vault_command()? {
2319 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
2320 other => Err(ParseError::new(
2321 format!("internal: VAULT produced unexpected query kind {other:?}"),
2322 self.position(),
2323 )),
2324 }
2325 }
2326 Token::Tree => match self.parse_tree_command()? {
2327 QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
2328 other => Err(ParseError::new(
2329 format!("internal: TREE produced unexpected query kind {other:?}"),
2330 self.position(),
2331 )),
2332 },
2333 Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
2334 match self.parse_hll_command()? {
2335 QueryExpr::ProbabilisticCommand(command) => {
2336 Ok(FrontendStatement::ProbabilisticCommand(command))
2337 }
2338 other => Err(ParseError::new(
2339 format!("internal: HLL produced unexpected query kind {other:?}"),
2340 self.position(),
2341 )),
2342 }
2343 }
2344 Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
2345 match self.parse_sketch_command()? {
2346 QueryExpr::ProbabilisticCommand(command) => {
2347 Ok(FrontendStatement::ProbabilisticCommand(command))
2348 }
2349 other => Err(ParseError::new(
2350 format!("internal: SKETCH produced unexpected query kind {other:?}"),
2351 self.position(),
2352 )),
2353 }
2354 }
2355 Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
2356 match self.parse_filter_command()? {
2357 QueryExpr::ProbabilisticCommand(command) => {
2358 Ok(FrontendStatement::ProbabilisticCommand(command))
2359 }
2360 other => Err(ParseError::new(
2361 format!("internal: FILTER produced unexpected query kind {other:?}"),
2362 self.position(),
2363 )),
2364 }
2365 }
2366 Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
2367 .parse_sql_command()
2368 .map(SqlCommand::into_statement)
2369 .map(FrontendStatement::Sql),
2370 other => Err(ParseError::expected(
2371 vec![
2372 "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
2373 "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
2374 "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
2375 "RESET", "DESCRIBE", "DESC", "RANK", "ZRANK", "ZRANGE",
2376 ],
2377 other,
2378 self.position(),
2379 )),
2380 }
2381 }
2382
2383 fn parse_ranking_read(&mut self) -> Result<QueryExpr, ParseError> {
2384 let head = self.expect_ident()?;
2385 if head.eq_ignore_ascii_case("RANK") {
2386 return self.parse_rank_after_rank(false);
2387 }
2388 if head.eq_ignore_ascii_case("APPROX") || head.eq_ignore_ascii_case("APPROXIMATE") {
2389 if !self.consume_ident_ci("RANK")? {
2390 return Err(ParseError::expected(
2391 vec!["RANK"],
2392 self.peek(),
2393 self.position(),
2394 ));
2395 }
2396 return self.parse_rank_after_rank(true);
2397 }
2398 if head.eq_ignore_ascii_case("ZRANK") {
2399 return self.parse_zrank();
2400 }
2401 if head.eq_ignore_ascii_case("ZRANGE") {
2402 return self.parse_zrange();
2403 }
2404 Err(ParseError::expected(
2405 vec!["RANK", "APPROX RANK", "ZRANK", "ZRANGE"],
2406 self.peek(),
2407 self.position(),
2408 ))
2409 }
2410
2411 fn parse_rank_after_rank(&mut self, approximate: bool) -> Result<QueryExpr, ParseError> {
2412 if self.consume(&Token::Of)? {
2413 let entity_id = self.parse_u64_slot("rank entity id")?;
2414 self.expect(Token::In)?;
2415 let ranking = self.expect_ident()?;
2416 let query = RankOfQuery { ranking, entity_id };
2417 return Ok(if approximate {
2418 QueryExpr::ApproxRankOf(query)
2419 } else {
2420 QueryExpr::RankOf(query)
2421 });
2422 }
2423
2424 if !approximate && self.consume(&Token::Range)? {
2425 let lo = self.parse_positive_u64_slot("rank range lower bound")?;
2426 self.expect(Token::To)?;
2427 let hi = self.parse_positive_u64_slot("rank range upper bound")?;
2428 if hi < lo {
2429 return Err(ParseError::value_out_of_range(
2430 "rank range upper bound",
2431 "must be greater than or equal to the lower bound",
2432 self.position(),
2433 ));
2434 }
2435 self.expect(Token::In)?;
2436 let ranking = self.expect_ident()?;
2437 return Ok(QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi }));
2438 }
2439
2440 Err(ParseError::expected(
2441 if approximate {
2442 vec!["OF"]
2443 } else {
2444 vec!["OF", "RANGE"]
2445 },
2446 self.peek(),
2447 self.position(),
2448 ))
2449 }
2450
2451 fn parse_zrank(&mut self) -> Result<QueryExpr, ParseError> {
2452 let ranking = self.expect_ident()?;
2453 let entity_id = self.parse_u64_slot("ZRANK entity id")?;
2454 Ok(QueryExpr::RankOf(RankOfQuery { ranking, entity_id }))
2455 }
2456
2457 fn parse_zrange(&mut self) -> Result<QueryExpr, ParseError> {
2458 let ranking = self.expect_ident()?;
2459 let start = self.parse_u64_slot("ZRANGE start")?;
2460 let stop = self.parse_u64_slot("ZRANGE stop")?;
2461 if stop < start {
2462 return Err(ParseError::value_out_of_range(
2463 "ZRANGE stop",
2464 "must be greater than or equal to start",
2465 self.position(),
2466 ));
2467 }
2468 let _with_scores = self.consume_ident_ci("WITHSCORES")?;
2469 Ok(QueryExpr::RankRange(RankRangeQuery {
2470 ranking,
2471 lo: start + 1,
2472 hi: stop + 1,
2473 }))
2474 }
2475
2476 fn parse_positive_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
2477 let value = self.parse_u64_slot(field)?;
2478 if value == 0 {
2479 return Err(ParseError::value_out_of_range(
2480 field,
2481 "must be a positive integer",
2482 self.position(),
2483 ));
2484 }
2485 Ok(value)
2486 }
2487
2488 fn parse_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
2489 let pos = self.position();
2490 if matches!(self.peek(), Token::Minus | Token::Dash) {
2491 return Err(ParseError::value_out_of_range(
2492 field,
2493 "must be an unsigned integer",
2494 pos,
2495 ));
2496 }
2497 let raw = self.parse_integer()?;
2498 u64::try_from(raw)
2499 .map_err(|_| ParseError::value_out_of_range(field, "must be an unsigned integer", pos))
2500 }
2501
2502 pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
2504 self.parse_sql_command().map(SqlCommand::into_statement)
2505 }
2506
2507 fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
2508 let mut path = self.expect_ident()?;
2509 while self.consume(&Token::Dot)? {
2510 let next = self.expect_ident_or_keyword()?;
2511 path = format!("{path}.{next}");
2512 }
2513 Ok(if lowercase {
2514 path.to_ascii_lowercase()
2515 } else {
2516 path
2517 })
2518 }
2519
2520 fn normalize_secret_admin_path(path: String) -> String {
2521 if let Some(rest) = path.strip_prefix("red.secrets.") {
2522 format!("red.secret.{rest}")
2523 } else if path == "red.secrets" {
2524 "red.secret".to_string()
2525 } else {
2526 path
2527 }
2528 }
2529
2530 #[inline(never)]
2539 fn parse_create_command(&mut self) -> Result<SqlCommand, ParseError> {
2540 let pos = self.position();
2541 self.advance()?;
2542
2543 let mut or_replace = false;
2547 if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
2548 let _ = self.consume_ident_ci("REPLACE")?;
2549 or_replace = true;
2550 }
2551 let materialized = self.consume(&Token::Materialized)?;
2552 if self.check(&Token::View) {
2553 self.advance()?;
2554 let if_not_exists = self.match_if_not_exists()?;
2555 let name = self.expect_ident()?;
2556 let mut retention_duration_ms: Option<u64> = None;
2564 if self.check(&Token::With) {
2565 self.advance()?;
2566 if !self.consume(&Token::Retention)? && !self.consume_ident_ci("RETENTION")? {
2567 return Err(ParseError::expected(
2568 vec!["RETENTION"],
2569 self.peek(),
2570 self.position(),
2571 ));
2572 }
2573 if !materialized {
2574 return Err(ParseError::new(
2575 "WITH RETENTION is only valid on \
2576 CREATE MATERIALIZED VIEW"
2577 .to_string(),
2578 self.position(),
2579 ));
2580 }
2581 let value = self.parse_float()?;
2582 let unit_mult = self.parse_duration_unit()?;
2583 retention_duration_ms = Some((value * unit_mult).round() as u64);
2584 }
2585 if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
2588 return Err(ParseError::expected(
2589 vec!["AS"],
2590 self.peek(),
2591 self.position(),
2592 ));
2593 }
2594 let body = self.parse_sql_command()?.into_query_expr();
2597 let mut refresh_every_ms: Option<u64> = None;
2602 if self.check(&Token::Refresh) {
2603 if !materialized {
2604 return Err(ParseError::new(
2605 "REFRESH EVERY is only valid on \
2606 CREATE MATERIALIZED VIEW"
2607 .to_string(),
2608 self.position(),
2609 ));
2610 }
2611 self.advance()?;
2612 if !self.consume_ident_ci("EVERY")? {
2613 return Err(ParseError::expected(
2614 vec!["EVERY"],
2615 self.peek(),
2616 self.position(),
2617 ));
2618 }
2619 let value = self.parse_float()?;
2620 let unit_mult = self.parse_duration_unit()?;
2621 refresh_every_ms = Some((value * unit_mult).round() as u64);
2622 }
2623 return Ok(SqlCommand::CreateView(CreateViewQuery {
2624 name,
2625 query: Box::new(body),
2626 materialized,
2627 if_not_exists,
2628 or_replace,
2629 refresh_every_ms,
2630 retention_duration_ms,
2631 }));
2632 }
2633 if or_replace || materialized {
2636 return Err(ParseError::expected(
2637 vec!["VIEW"],
2638 self.peek(),
2639 self.position(),
2640 ));
2641 }
2642
2643 if matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("USER")) {
2644 let stmt = self.parse_create_user_statement()?;
2645 Ok(SqlCommand::CreateUser(stmt))
2646 } else if self.check(&Token::Index) || self.check(&Token::Unique) {
2647 match self.parse_create_index_query()? {
2648 QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
2649 other => Err(ParseError::new(
2650 format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
2651 self.position(),
2652 )),
2653 }
2654 } else if self.check(&Token::Table) {
2655 self.expect(Token::Table)?;
2656 match self.parse_create_table_body()? {
2657 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2658 other => Err(ParseError::new(
2659 format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
2660 self.position(),
2661 )),
2662 }
2663 } else if self.check(&Token::Graph) {
2664 self.advance()?;
2665 match self.parse_create_collection_model_body(CollectionModel::Graph)? {
2666 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2667 other => Err(ParseError::new(
2668 format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
2669 self.position(),
2670 )),
2671 }
2672 } else if self.check(&Token::Document) {
2673 self.advance()?;
2674 match self.parse_create_collection_model_body(CollectionModel::Document)? {
2675 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2676 other => Err(ParseError::new(
2677 format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
2678 self.position(),
2679 )),
2680 }
2681 } else if self.check(&Token::Vector) {
2682 self.advance()?;
2683 match self.parse_create_vector_body()? {
2684 QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
2685 other => Err(ParseError::new(
2686 format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
2687 self.position(),
2688 )),
2689 }
2690 } else if self.check(&Token::Collection) {
2691 self.advance()?;
2692 match self.parse_create_collection_body()? {
2693 QueryExpr::CreateCollection(query) => Ok(SqlCommand::CreateCollection(query)),
2694 other => Err(ParseError::new(
2695 format!("internal: CREATE COLLECTION produced unexpected kind {other:?}"),
2696 self.position(),
2697 )),
2698 }
2699 } else if self.check(&Token::Kv) {
2700 self.advance()?;
2701 match self.parse_create_keyed_body(CollectionModel::Kv)? {
2702 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2703 other => Err(ParseError::new(
2704 format!("internal: CREATE KV produced unexpected kind {other:?}"),
2705 self.position(),
2706 )),
2707 }
2708 } else if self.consume_ident_ci("CONFIG")? {
2709 match self.parse_create_keyed_body(CollectionModel::Config)? {
2710 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2711 other => Err(ParseError::new(
2712 format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
2713 self.position(),
2714 )),
2715 }
2716 } else if self.consume_ident_ci("VAULT")? {
2717 match self.parse_create_keyed_body(CollectionModel::Vault)? {
2718 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2719 other => Err(ParseError::new(
2720 format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
2721 self.position(),
2722 )),
2723 }
2724 } else if self.check(&Token::Timeseries) {
2725 self.advance()?;
2726 match self.parse_create_timeseries_body()? {
2727 QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
2728 other => Err(ParseError::new(
2729 format!("internal: CREATE TIMESERIES produced unexpected kind {other:?}"),
2730 self.position(),
2731 )),
2732 }
2733 } else if self.check(&Token::Metric) {
2734 self.advance()?;
2735 match self.parse_create_metric_body()? {
2736 QueryExpr::CreateMetric(query) => Ok(SqlCommand::CreateMetric(query)),
2737 other => Err(ParseError::new(
2738 format!("internal: CREATE METRIC produced unexpected kind {other:?}"),
2739 self.position(),
2740 )),
2741 }
2742 } else if self.consume_ident_ci("METRICS")? {
2743 match self.parse_create_metrics_body()? {
2744 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
2745 other => Err(ParseError::new(
2746 format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
2747 self.position(),
2748 )),
2749 }
2750 } else if self.consume_ident_ci("SLO")? {
2751 match self.parse_create_slo_body()? {
2752 QueryExpr::CreateSlo(query) => Ok(SqlCommand::CreateSlo(query)),
2753 other => Err(ParseError::new(
2754 format!("internal: CREATE SLO produced unexpected kind {other:?}"),
2755 self.position(),
2756 )),
2757 }
2758 } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE")) {
2759 self.advance()?;
2760 match self.parse_create_hypertable_body()? {
2761 QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
2762 other => Err(ParseError::new(
2763 format!("internal: CREATE HYPERTABLE produced unexpected kind {other:?}"),
2764 self.position(),
2765 )),
2766 }
2767 } else if self.check(&Token::Queue) {
2768 self.advance()?;
2769 match self.parse_create_queue_body()? {
2770 QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
2771 other => Err(ParseError::new(
2772 format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
2773 self.position(),
2774 )),
2775 }
2776 } else if self.check(&Token::Tree) {
2777 self.advance()?;
2778 match self.parse_create_tree_body()? {
2779 QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
2780 other => Err(ParseError::new(
2781 format!("internal: CREATE TREE produced unexpected kind {other:?}"),
2782 self.position(),
2783 )),
2784 }
2785 } else if matches!(self.peek(), Token::Ident(n) if
2786 n.eq_ignore_ascii_case("HLL") ||
2787 n.eq_ignore_ascii_case("SKETCH") ||
2788 n.eq_ignore_ascii_case("FILTER"))
2789 {
2790 match self.parse_create_probabilistic()? {
2791 QueryExpr::ProbabilisticCommand(command) => Ok(SqlCommand::Probabilistic(command)),
2792 other => Err(ParseError::new(
2793 format!("internal: CREATE probabilistic produced unexpected kind {other:?}"),
2794 self.position(),
2795 )),
2796 }
2797 } else if self.check(&Token::Schema) {
2798 self.advance()?;
2800 let if_not_exists = self.match_if_not_exists()?;
2801 let name = self.expect_ident()?;
2802 Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
2803 name,
2804 if_not_exists,
2805 }))
2806 } else if self.check(&Token::Policy) {
2807 self.advance()?;
2812 if matches!(self.peek(), Token::String(_)) {
2813 let expr = self.parse_create_iam_policy_after_keywords()?;
2815 return Ok(SqlCommand::IamPolicy(expr));
2821 }
2822 let name = self.expect_ident()?;
2823 self.expect(Token::On)?;
2824
2825 let (target_kind, table) = {
2826 use crate::ast::PolicyTargetKind;
2827 let kw = match self.peek() {
2828 Token::Ident(s) => Some(s.to_ascii_uppercase()),
2829 _ => None,
2830 };
2831 let kind = kw.as_deref().and_then(|k| match k {
2832 "NODES" => Some(PolicyTargetKind::Nodes),
2833 "EDGES" => Some(PolicyTargetKind::Edges),
2834 "VECTORS" => Some(PolicyTargetKind::Vectors),
2835 "MESSAGES" => Some(PolicyTargetKind::Messages),
2836 "POINTS" => Some(PolicyTargetKind::Points),
2837 "DOCUMENTS" => Some(PolicyTargetKind::Documents),
2838 _ => None,
2839 });
2840 if let Some(k) = kind {
2841 self.advance()?;
2842 self.expect(Token::Of)?;
2843 let coll = self.expect_ident()?;
2844 (k, coll)
2845 } else {
2846 let coll = self.expect_ident()?;
2847 (PolicyTargetKind::Table, coll)
2848 }
2849 };
2850
2851 let action = if self.consume(&Token::For)? {
2852 let a = match self.peek() {
2853 Token::Select => {
2854 self.advance()?;
2855 Some(PolicyAction::Select)
2856 }
2857 Token::Insert => {
2858 self.advance()?;
2859 Some(PolicyAction::Insert)
2860 }
2861 Token::Update => {
2862 self.advance()?;
2863 Some(PolicyAction::Update)
2864 }
2865 Token::Delete => {
2866 self.advance()?;
2867 Some(PolicyAction::Delete)
2868 }
2869 Token::All => {
2870 self.advance()?;
2871 None
2872 }
2873 _ => None,
2874 };
2875 a
2876 } else {
2877 None
2878 };
2879
2880 let role = if self.consume(&Token::To)? {
2881 Some(self.expect_ident()?)
2882 } else {
2883 None
2884 };
2885
2886 self.expect(Token::Using)?;
2887 self.expect(Token::LParen)?;
2888 let filter = self.parse_filter()?;
2889 self.expect(Token::RParen)?;
2890
2891 Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
2892 name,
2893 table,
2894 action,
2895 role,
2896 using: Box::new(filter),
2897 target_kind,
2898 }))
2899 } else if self.check(&Token::Server) {
2900 self.advance()?;
2904 let if_not_exists = self.match_if_not_exists()?;
2905 let name = self.expect_ident()?;
2906 self.expect(Token::Foreign)?;
2907 self.expect(Token::Data)?;
2908 self.expect(Token::Wrapper)?;
2909 let wrapper = self.expect_ident()?;
2910 let options = self.parse_fdw_options_clause()?;
2911 Ok(SqlCommand::CreateServer(CreateServerQuery {
2912 name,
2913 wrapper,
2914 options,
2915 if_not_exists,
2916 }))
2917 } else if self.check(&Token::Foreign) {
2918 self.advance()?;
2922 self.expect(Token::Table)?;
2923 let if_not_exists = self.match_if_not_exists()?;
2924 let name = self.expect_ident()?;
2925 self.expect(Token::LParen)?;
2926 let mut columns = Vec::new();
2927 loop {
2928 let col_name = self.expect_ident()?;
2929 let data_type = self.expect_ident_or_keyword()?;
2930 let mut not_null = false;
2933 if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
2934 self.advance()?;
2935 if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL")) {
2936 self.advance()?;
2937 not_null = true;
2938 }
2939 }
2940 columns.push(ForeignColumnDef {
2941 name: col_name,
2942 data_type,
2943 not_null,
2944 });
2945 if !self.consume(&Token::Comma)? {
2946 break;
2947 }
2948 }
2949 self.expect(Token::RParen)?;
2950 self.expect(Token::Server)?;
2951 let server = self.expect_ident()?;
2952 let options = self.parse_fdw_options_clause()?;
2953 Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
2954 name,
2955 server,
2956 columns,
2957 options,
2958 if_not_exists,
2959 }))
2960 } else if self.check(&Token::Sequence) {
2961 self.advance()?;
2964 let if_not_exists = self.match_if_not_exists()?;
2965 let name = self.expect_ident()?;
2966 let mut start: i64 = 1;
2967 let mut increment: i64 = 1;
2968 loop {
2970 if self.consume(&Token::Start)? {
2971 let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
2973 start = self.parse_integer()?;
2974 } else if self.consume(&Token::Increment)? {
2975 let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
2977 increment = self.parse_integer()?;
2978 } else {
2979 break;
2980 }
2981 }
2982 Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
2983 name,
2984 if_not_exists,
2985 start,
2986 increment,
2987 }))
2988 } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2989 self.advance()?; match self.parse_create_migration_body()? {
2991 QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
2992 other => Err(ParseError::new(
2993 format!("internal: CREATE MIGRATION produced unexpected kind {other:?}"),
2994 self.position(),
2995 )),
2996 }
2997 } else if let Some(reason) = analytics_v0_non_goal_create(self.peek()) {
2998 Err(ParseError::new(reason, self.position()))
3006 } else if let Some(err) =
3007 ParseError::unsupported_recognized_token(self.peek(), self.position())
3008 {
3009 Err(err)
3010 } else {
3011 Err(ParseError::expected(
3012 vec![
3013 "TABLE",
3014 "GRAPH",
3015 "VECTOR",
3016 "DOCUMENT",
3017 "KV",
3018 "COLLECTION",
3019 "INDEX",
3020 "UNIQUE",
3021 "METRIC",
3022 "TIMESERIES",
3023 "QUEUE",
3024 "TREE",
3025 "HLL",
3026 "SKETCH",
3027 "FILTER",
3028 "SCHEMA",
3029 "SEQUENCE",
3030 "USER",
3031 "MIGRATION",
3032 ],
3033 self.peek(),
3034 pos,
3035 ))
3036 }
3037 }
3038
3039 pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
3040 match self.peek() {
3041 Token::Select => match self.parse_select_query()? {
3042 QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
3043 QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
3044 other => Err(ParseError::new(
3045 format!("internal: SELECT produced unexpected query kind {other:?}"),
3046 self.position(),
3047 )),
3048 },
3049 Token::From => match self.parse_from_query()? {
3050 QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
3051 QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
3052 other => Err(ParseError::new(
3053 format!("internal: FROM produced unexpected query kind {other:?}"),
3054 self.position(),
3055 )),
3056 },
3057 Token::Insert => match self.parse_insert_query()? {
3058 QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
3059 other => Err(ParseError::new(
3060 format!("internal: INSERT produced unexpected query kind {other:?}"),
3061 self.position(),
3062 )),
3063 },
3064 Token::Update => match self.parse_update_query()? {
3065 QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
3066 other => Err(ParseError::new(
3067 format!("internal: UPDATE produced unexpected query kind {other:?}"),
3068 self.position(),
3069 )),
3070 },
3071 Token::Delete => {
3072 if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
3073 {
3074 self.advance()?; self.advance()?; let key =
3077 Self::normalize_secret_admin_path(self.parse_dotted_admin_path(true)?);
3078 Ok(SqlCommand::DeleteSecret { key })
3079 } else {
3080 match self.parse_delete_query()? {
3081 QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
3082 other => Err(ParseError::new(
3083 format!("internal: DELETE produced unexpected query kind {other:?}"),
3084 self.position(),
3085 )),
3086 }
3087 }
3088 }
3089 Token::Truncate => {
3090 self.advance()?;
3091 let model = if self.consume(&Token::Table)? {
3092 Some(CollectionModel::Table)
3093 } else if self.consume(&Token::Graph)? {
3094 Some(CollectionModel::Graph)
3095 } else if self.consume(&Token::Vector)? {
3096 Some(CollectionModel::Vector)
3097 } else if self.consume(&Token::Document)? {
3098 Some(CollectionModel::Document)
3099 } else if self.consume(&Token::Timeseries)? {
3100 Some(CollectionModel::TimeSeries)
3101 } else if self.consume_ident_ci("METRICS")? {
3102 Some(CollectionModel::Metrics)
3103 } else if self.consume(&Token::Kv)? {
3104 Some(CollectionModel::Kv)
3105 } else if self.consume(&Token::Queue)? {
3106 Some(CollectionModel::Queue)
3107 } else if self.consume(&Token::Collection)? {
3108 None
3109 } else {
3110 return Err(ParseError::expected(
3111 vec![
3112 "TABLE",
3113 "GRAPH",
3114 "VECTOR",
3115 "DOCUMENT",
3116 "TIMESERIES",
3117 "METRICS",
3118 "KV",
3119 "QUEUE",
3120 "COLLECTION",
3121 ],
3122 self.peek(),
3123 self.position(),
3124 ));
3125 };
3126 match self.parse_truncate_body(model)? {
3127 QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
3128 other => Err(ParseError::new(
3129 format!("internal: TRUNCATE produced unexpected kind {other:?}"),
3130 self.position(),
3131 )),
3132 }
3133 }
3134 Token::Explain => {
3135 if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
3138 {
3139 self.advance()?; match self.parse_explain_migration_after_keyword()? {
3141 QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
3142 other => Err(ParseError::new(
3143 format!(
3144 "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
3145 ),
3146 self.position(),
3147 )),
3148 }
3149 } else {
3150 match self.parse_explain_alter_query()? {
3151 QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
3152 other => Err(ParseError::new(
3153 format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
3154 self.position(),
3155 )),
3156 }
3157 }
3158 }
3159 Token::Create => self.parse_create_command(),
3160 Token::Drop => {
3161 let pos = self.position();
3162 self.advance()?;
3163
3164 let materialized = self.consume(&Token::Materialized)?;
3166 if self.check(&Token::View) {
3167 self.advance()?;
3168 let if_exists = self.match_if_exists()?;
3169 let name = self.expect_ident()?;
3170 return Ok(SqlCommand::DropView(DropViewQuery {
3171 name,
3172 materialized,
3173 if_exists,
3174 }));
3175 }
3176 if materialized {
3177 return Err(ParseError::expected(
3178 vec!["VIEW"],
3179 self.peek(),
3180 self.position(),
3181 ));
3182 }
3183
3184 if self.check(&Token::Index) {
3185 match self.parse_drop_index_query()? {
3186 QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
3187 other => Err(ParseError::new(
3188 format!("internal: DROP INDEX produced unexpected kind {other:?}"),
3189 self.position(),
3190 )),
3191 }
3192 } else if self.check(&Token::Table) {
3193 self.expect(Token::Table)?;
3194 match self.parse_drop_table_body()? {
3195 QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
3196 other => Err(ParseError::new(
3197 format!("internal: DROP TABLE produced unexpected kind {other:?}"),
3198 self.position(),
3199 )),
3200 }
3201 } else if self.check(&Token::Graph) {
3202 self.advance()?;
3203 match self.parse_drop_graph_body()? {
3204 QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
3205 other => Err(ParseError::new(
3206 format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
3207 self.position(),
3208 )),
3209 }
3210 } else if self.check(&Token::Vector) {
3211 self.advance()?;
3212 match self.parse_drop_vector_body()? {
3213 QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
3214 other => Err(ParseError::new(
3215 format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
3216 self.position(),
3217 )),
3218 }
3219 } else if self.check(&Token::Document) {
3220 self.advance()?;
3221 match self.parse_drop_document_body()? {
3222 QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
3223 other => Err(ParseError::new(
3224 format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
3225 self.position(),
3226 )),
3227 }
3228 } else if self.check(&Token::Kv) {
3229 self.advance()?;
3230 match self.parse_drop_kv_body()? {
3231 QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
3232 other => Err(ParseError::new(
3233 format!("internal: DROP KV produced unexpected kind {other:?}"),
3234 self.position(),
3235 )),
3236 }
3237 } else if self.consume_ident_ci("CONFIG")? {
3238 match self.parse_drop_keyed_body(CollectionModel::Config)? {
3239 QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
3240 other => Err(ParseError::new(
3241 format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
3242 self.position(),
3243 )),
3244 }
3245 } else if self.consume_ident_ci("VAULT")? {
3246 match self.parse_drop_keyed_body(CollectionModel::Vault)? {
3247 QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
3248 other => Err(ParseError::new(
3249 format!("internal: DROP VAULT produced unexpected kind {other:?}"),
3250 self.position(),
3251 )),
3252 }
3253 } else if self.check(&Token::Collection) {
3254 self.advance()?;
3255 match self.parse_drop_collection_body()? {
3256 QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
3257 other => Err(ParseError::new(
3258 format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
3259 self.position(),
3260 )),
3261 }
3262 } else if self.check(&Token::Timeseries) {
3263 self.advance()?;
3264 match self.parse_drop_timeseries_body()? {
3265 QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
3266 other => Err(ParseError::new(
3267 format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
3268 self.position(),
3269 )),
3270 }
3271 } else if self.consume_ident_ci("METRICS")? {
3272 match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
3273 QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
3274 other => Err(ParseError::new(
3275 format!("internal: DROP METRICS produced unexpected kind {other:?}"),
3276 self.position(),
3277 )),
3278 }
3279 } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
3280 {
3281 self.advance()?;
3285 match self.parse_drop_timeseries_body()? {
3286 QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
3287 other => Err(ParseError::new(
3288 format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
3289 self.position(),
3290 )),
3291 }
3292 } else if self.check(&Token::Queue) {
3293 self.advance()?;
3294 match self.parse_drop_queue_body()? {
3295 QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
3296 other => Err(ParseError::new(
3297 format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
3298 self.position(),
3299 )),
3300 }
3301 } else if self.check(&Token::Tree) {
3302 self.advance()?;
3303 match self.parse_drop_tree_body()? {
3304 QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
3305 other => Err(ParseError::new(
3306 format!("internal: DROP TREE produced unexpected kind {other:?}"),
3307 self.position(),
3308 )),
3309 }
3310 } else if matches!(self.peek(), Token::Ident(n) if
3311 n.eq_ignore_ascii_case("HLL") ||
3312 n.eq_ignore_ascii_case("SKETCH") ||
3313 n.eq_ignore_ascii_case("FILTER"))
3314 {
3315 match self.parse_drop_probabilistic()? {
3316 QueryExpr::ProbabilisticCommand(command) => {
3317 Ok(SqlCommand::Probabilistic(command))
3318 }
3319 other => Err(ParseError::new(
3320 format!(
3321 "internal: DROP probabilistic produced unexpected kind {other:?}"
3322 ),
3323 self.position(),
3324 )),
3325 }
3326 } else if self.check(&Token::Schema) {
3327 self.advance()?;
3329 let if_exists = self.match_if_exists()?;
3330 let name = self.expect_ident()?;
3331 let cascade = self.consume(&Token::Cascade)?;
3332 Ok(SqlCommand::DropSchema(DropSchemaQuery {
3333 name,
3334 if_exists,
3335 cascade,
3336 }))
3337 } else if self.check(&Token::Policy) {
3338 self.advance()?;
3342 if matches!(self.peek(), Token::String(_)) {
3343 let expr = self.parse_drop_iam_policy_after_keywords()?;
3344 return Ok(SqlCommand::IamPolicy(expr));
3345 }
3346 let if_exists = self.match_if_exists()?;
3347 let name = self.expect_ident()?;
3348 self.expect(Token::On)?;
3349 let table = self.expect_ident()?;
3350 Ok(SqlCommand::DropPolicy(DropPolicyQuery {
3351 name,
3352 table,
3353 if_exists,
3354 }))
3355 } else if self.check(&Token::Server) {
3356 self.advance()?;
3358 let if_exists = self.match_if_exists()?;
3359 let name = self.expect_ident()?;
3360 let cascade = self.consume(&Token::Cascade)?;
3361 Ok(SqlCommand::DropServer(DropServerQuery {
3362 name,
3363 if_exists,
3364 cascade,
3365 }))
3366 } else if self.check(&Token::Foreign) {
3367 self.advance()?;
3369 self.expect(Token::Table)?;
3370 let if_exists = self.match_if_exists()?;
3371 let name = self.expect_ident()?;
3372 Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
3373 name,
3374 if_exists,
3375 }))
3376 } else if self.check(&Token::Sequence) {
3377 self.advance()?;
3379 let if_exists = self.match_if_exists()?;
3380 let name = self.expect_ident()?;
3381 Ok(SqlCommand::DropSequence(DropSequenceQuery {
3382 name,
3383 if_exists,
3384 }))
3385 } else if let Some(err) =
3386 ParseError::unsupported_recognized_token(self.peek(), self.position())
3387 {
3388 Err(err)
3389 } else {
3390 Err(ParseError::expected(
3391 vec![
3392 "TABLE",
3393 "INDEX",
3394 "TIMESERIES",
3395 "QUEUE",
3396 "TREE",
3397 "HLL",
3398 "SKETCH",
3399 "FILTER",
3400 "SCHEMA",
3401 "SEQUENCE",
3402 ],
3403 self.peek(),
3404 pos,
3405 ))
3406 }
3407 }
3408 Token::Alter => {
3409 let next = self.peek_next()?.clone();
3414 if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
3415 self.advance()?; let stmt = self.parse_alter_user_statement()?;
3417 Ok(SqlCommand::AlterUser(stmt))
3418 } else if matches!(next, Token::Queue) {
3419 self.advance()?; self.advance()?; match self.parse_alter_queue_body()? {
3422 QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
3423 other => Err(ParseError::new(
3424 format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
3425 self.position(),
3426 )),
3427 }
3428 } else if matches!(next, Token::Metric) {
3429 self.advance()?; self.advance()?; match self.parse_alter_metric_body()? {
3432 QueryExpr::AlterMetric(query) => Ok(SqlCommand::AlterMetric(query)),
3433 other => Err(ParseError::new(
3434 format!("internal: ALTER METRIC produced unexpected kind {other:?}"),
3435 self.position(),
3436 )),
3437 }
3438 } else if matches!(next, Token::Graph) {
3439 match self.parse_alter_graph_query()? {
3443 QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
3444 other => Err(ParseError::new(
3445 format!(
3446 "internal: ALTER GRAPH produced unexpected query kind {other:?}"
3447 ),
3448 self.position(),
3449 )),
3450 }
3451 } else if matches!(next, Token::Table)
3452 || matches!(next, Token::Collection)
3453 || matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("COLLECTION"))
3454 {
3455 match self.parse_alter_table_query()? {
3461 QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
3462 other => Err(ParseError::new(
3463 format!(
3464 "internal: ALTER TABLE produced unexpected query kind {other:?}"
3465 ),
3466 self.position(),
3467 )),
3468 }
3469 } else if let Some(err) =
3470 ParseError::unsupported_recognized_token(&next, self.position())
3471 {
3472 Err(err)
3473 } else {
3474 match self.parse_alter_table_query()? {
3475 QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
3476 other => Err(ParseError::new(
3477 format!("internal: ALTER produced unexpected query kind {other:?}"),
3478 self.position(),
3479 )),
3480 }
3481 }
3482 }
3483 Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
3484 let stmt = self.parse_grant_statement()?;
3485 Ok(SqlCommand::Grant(stmt))
3486 }
3487 Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
3488 let stmt = self.parse_revoke_statement()?;
3489 Ok(SqlCommand::Revoke(stmt))
3490 }
3491 Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
3492 self.advance()?;
3493 if self.consume_ident_ci("BACKFILL")? {
3494 return Err(ParseError::new(
3495 "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
3496 .to_string(),
3497 self.position(),
3498 ));
3499 }
3500 if !self.consume_ident_ci("STATUS")? {
3501 return Err(ParseError::expected(
3502 vec!["STATUS"],
3503 self.peek(),
3504 self.position(),
3505 ));
3506 }
3507
3508 let mut query = TableQuery::new("red.subscriptions");
3509 let collection = match self.peek().clone() {
3510 Token::Ident(name) => {
3511 self.advance()?;
3512 Some(name)
3513 }
3514 Token::String(name) => {
3515 self.advance()?;
3516 Some(name)
3517 }
3518 _ => None,
3519 };
3520 self.parse_table_clauses(&mut query)?;
3521 if let Some(collection) = collection {
3522 let filter = Filter::compare(
3523 FieldRef::column("red.subscriptions", "collection"),
3524 CompareOp::Eq,
3525 Value::text(collection),
3526 );
3527 let expr = filter_to_expr(&filter);
3528 query.where_expr = Some(match query.where_expr.take() {
3529 Some(existing) => Expr::binop(BinOp::And, existing, expr),
3530 None => expr,
3531 });
3532 query.filter = Some(match query.filter.take() {
3533 Some(existing) => existing.and(filter),
3534 None => filter,
3535 });
3536 }
3537 Ok(SqlCommand::Select(query))
3538 }
3539 Token::Attach => {
3540 let expr = self.parse_attach_policy()?;
3541 Ok(SqlCommand::IamPolicy(expr))
3542 }
3543 Token::Detach => {
3544 let expr = self.parse_detach_policy()?;
3545 Ok(SqlCommand::IamPolicy(expr))
3546 }
3547 Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
3548 let expr = self.parse_simulate_policy()?;
3549 Ok(SqlCommand::IamPolicy(expr))
3550 }
3551 Token::Ident(name) if name.eq_ignore_ascii_case("LINT") => {
3552 let expr = self.parse_lint_policy()?;
3553 Ok(SqlCommand::IamPolicy(expr))
3554 }
3555 Token::Ident(name) if name.eq_ignore_ascii_case("MIGRATE") => {
3556 let next = self.peek_next()?.clone();
3560 let is_policy_mode = matches!(&next, Token::Policy)
3561 || matches!(&next, Token::Ident(name)
3562 if name.eq_ignore_ascii_case("POLICY"));
3563 if is_policy_mode {
3564 let expr = self.parse_migrate_policy_mode()?;
3565 return Ok(SqlCommand::IamPolicy(expr));
3566 }
3567 Err(ParseError::expected(
3568 vec!["POLICY"],
3569 self.peek(),
3570 self.position(),
3571 ))
3572 }
3573 Token::Set => {
3574 self.advance()?;
3575 if self.consume_ident_ci("CONFIG")? {
3576 let full_key = self.parse_dotted_admin_path(true)?;
3577 self.expect(Token::Eq)?;
3578 let value = self.parse_literal_value()?;
3579 Ok(SqlCommand::SetConfig {
3580 key: full_key,
3581 value,
3582 })
3583 } else if self.consume_ident_ci("SECRET")? {
3584 let key =
3585 Self::normalize_secret_admin_path(self.parse_dotted_admin_path(true)?);
3586 self.expect(Token::Eq)?;
3587 let value = self.parse_literal_value()?;
3588 Ok(SqlCommand::SetSecret { key, value })
3589 } else if self.consume_ident_ci("TENANT")? {
3590 let _ = self.consume(&Token::Eq)?;
3593 if self.consume_ident_ci("NULL")? {
3594 Ok(SqlCommand::SetTenant(None))
3595 } else {
3596 let value = self.parse_literal_value()?;
3597 match value {
3598 Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
3599 Value::Null => Ok(SqlCommand::SetTenant(None)),
3600 other => Err(ParseError::new(
3601 format!("SET TENANT expects a text literal or NULL, got {other:?}"),
3602 self.position(),
3603 )),
3604 }
3605 }
3606 } else {
3607 Err(ParseError::expected(
3608 vec!["CONFIG", "SECRET", "TENANT"],
3609 self.peek(),
3610 self.position(),
3611 ))
3612 }
3613 }
3614 Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
3615 self.advance()?;
3616 match self.parse_apply_migration()? {
3617 QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
3618 other => Err(ParseError::new(
3619 format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
3620 self.position(),
3621 )),
3622 }
3623 }
3624 Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
3625 self.advance()?;
3627 if self.consume_ident_ci("TENANT")? {
3628 Ok(SqlCommand::SetTenant(None))
3629 } else {
3630 Err(ParseError::expected(
3631 vec!["TENANT"],
3632 self.peek(),
3633 self.position(),
3634 ))
3635 }
3636 }
3637 Token::Ident(name)
3638 if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
3639 {
3640 self.advance()?;
3641 let collection = self.parse_dotted_admin_path(false)?;
3642 let mut query = TableQuery::new("red.describe");
3643 query.filter = Some(Filter::compare(
3644 FieldRef::column("", "collection"),
3645 CompareOp::Eq,
3646 Value::text(collection),
3647 ));
3648 Ok(SqlCommand::Select(query))
3649 }
3650 Token::Desc => {
3651 self.advance()?;
3652 let collection = self.parse_dotted_admin_path(false)?;
3653 let mut query = TableQuery::new("red.describe");
3654 query.filter = Some(Filter::compare(
3655 FieldRef::column("", "collection"),
3656 CompareOp::Eq,
3657 Value::text(collection),
3658 ));
3659 Ok(SqlCommand::Select(query))
3660 }
3661 Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
3662 self.advance()?;
3663 if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
3664 if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
3665 return Err(ParseError::expected(
3666 vec!["TABLE"],
3667 self.peek(),
3668 self.position(),
3669 ));
3670 }
3671 let collection = self.parse_dotted_admin_path(false)?;
3672 let mut query = TableQuery::new("red.show_create");
3673 query.filter = Some(Filter::compare(
3674 FieldRef::column("", "collection"),
3675 CompareOp::Eq,
3676 Value::text(collection),
3677 ));
3678 Ok(SqlCommand::Select(query))
3679 } else if self.consume_ident_ci("CONFIG")? {
3680 let prefix = if !(self.check(&Token::Eof)
3684 || self.check(&Token::As)
3685 || self.check(&Token::Format))
3686 {
3687 let first = self.expect_ident()?;
3688 let mut full = first;
3689 while self.consume(&Token::Dot)? {
3690 let next = self.expect_ident_or_keyword()?;
3691 full = format!("{full}.{next}");
3692 }
3693 Some(full.to_ascii_lowercase())
3696 } else {
3697 None
3698 };
3699 let as_json = if self.consume(&Token::As)? || self.consume(&Token::Format)? {
3700 if !self.consume(&Token::Json)? {
3701 return Err(ParseError::expected(
3702 vec!["JSON"],
3703 self.peek(),
3704 self.position(),
3705 ));
3706 }
3707 true
3708 } else {
3709 false
3710 };
3711 Ok(SqlCommand::ShowConfig { prefix, as_json })
3712 } else if self.consume_ident_ci("COLLECTIONS")? {
3713 let mut query = TableQuery::new("red.collections");
3714 let include_internal = if self.consume_ident_ci("INCLUDING")? {
3715 if !self.consume_ident_ci("INTERNAL")? {
3716 return Err(ParseError::expected(
3717 vec!["INTERNAL"],
3718 self.peek(),
3719 self.position(),
3720 ));
3721 }
3722 true
3723 } else {
3724 false
3725 };
3726 self.parse_table_clauses(&mut query)?;
3727 if !include_internal {
3728 let user_filter = query.filter.take();
3729 let hide_internal = crate::ast::Filter::Compare {
3730 field: FieldRef::column("", "internal"),
3731 op: CompareOp::Eq,
3732 value: Value::Boolean(false),
3733 };
3734 query.filter = Some(match user_filter {
3735 Some(filter) => filter.and(hide_internal),
3736 None => hide_internal,
3737 });
3738 }
3739 Ok(SqlCommand::Select(query))
3740 } else if self.consume_ident_ci("TABLES")? {
3741 Ok(SqlCommand::Select(parse_show_collections_by_model(
3742 self, "table",
3743 )?))
3744 } else if self.consume_ident_ci("QUEUES")? {
3745 let mut query = TableQuery::new("red.queues");
3753 let include_internal = if self.consume_ident_ci("INCLUDING")? {
3754 if !self.consume_ident_ci("INTERNAL")? {
3755 return Err(ParseError::expected(
3756 vec!["INTERNAL"],
3757 self.peek(),
3758 self.position(),
3759 ));
3760 }
3761 true
3762 } else {
3763 false
3764 };
3765 self.parse_table_clauses(&mut query)?;
3766 if !include_internal {
3767 let hide_internal = Filter::Compare {
3768 field: FieldRef::column("", "internal"),
3769 op: CompareOp::Eq,
3770 value: Value::Boolean(false),
3771 };
3772 add_table_filter(&mut query, hide_internal);
3773 }
3774 Ok(SqlCommand::Select(query))
3775 } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
3776 Ok(SqlCommand::Select(parse_show_collections_by_model(
3777 self, "vector",
3778 )?))
3779 } else if self.consume_ident_ci("DOCUMENTS")? {
3780 Ok(SqlCommand::Select(parse_show_collections_by_model(
3781 self, "document",
3782 )?))
3783 } else if self.consume(&Token::Timeseries)?
3784 || self.consume_ident_ci("TIMESERIES")?
3785 {
3786 Ok(SqlCommand::Select(parse_show_collections_by_model(
3787 self,
3788 "timeseries",
3789 )?))
3790 } else if self.consume_ident_ci("GRAPHS")? {
3791 Ok(SqlCommand::Select(parse_show_collections_by_model(
3792 self, "graph",
3793 )?))
3794 } else if self.consume_ident_ci("CONFIGS")? {
3795 Ok(SqlCommand::Select(parse_show_collections_by_model(
3796 self, "config",
3797 )?))
3798 } else if self.consume_ident_ci("VAULTS")? {
3799 Ok(SqlCommand::Select(parse_show_collections_by_model(
3800 self, "vault",
3801 )?))
3802 } else if self.consume(&Token::Kv)?
3803 || self.consume_ident_ci("KV")?
3804 || self.consume_ident_ci("KVS")?
3805 {
3806 Ok(SqlCommand::Select(parse_show_collections_by_model(
3807 self, "kv",
3808 )?))
3809 } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
3810 let collection = self.parse_dotted_admin_path(false)?;
3811 let mut query = TableQuery::new("red.columns");
3812 query.filter = Some(Filter::compare(
3813 FieldRef::column("", "collection"),
3814 CompareOp::Eq,
3815 Value::text(collection),
3816 ));
3817 Ok(SqlCommand::Select(query))
3818 } else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
3819 let mut query = TableQuery::new("red.show_indexes");
3820 if self.consume(&Token::On)? {
3821 let collection = self.expect_ident_or_keyword()?;
3822 let filter = Filter::Compare {
3823 field: FieldRef::column("", "table"),
3824 op: CompareOp::Eq,
3825 value: Value::text(collection),
3826 };
3827 query.where_expr = Some(filter_to_expr(&filter));
3828 query.filter = Some(filter);
3829 }
3830 self.parse_table_clauses(&mut query)?;
3831 Ok(SqlCommand::Select(query))
3832 } else if self.consume_ident_ci("POLICIES")? {
3833 if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
3834 let principal = self.parse_iam_principal_kind()?;
3835 return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
3836 filter: Some(principal),
3837 }));
3838 }
3839 let mut query = TableQuery::new("red.policies");
3840 let collection_filter =
3841 if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
3842 let collection = self.parse_dotted_admin_path(false)?;
3843 Some(Filter::Compare {
3844 field: FieldRef::TableColumn {
3845 table: String::new(),
3846 column: "collection".to_string(),
3847 },
3848 op: CompareOp::Eq,
3849 value: Value::text(collection),
3850 })
3851 } else {
3852 None
3853 };
3854 self.parse_table_clauses(&mut query)?;
3855 if let Some(collection_filter) = collection_filter {
3856 let combined = match query.filter.take() {
3857 Some(existing) => {
3858 Filter::And(Box::new(collection_filter), Box::new(existing))
3859 }
3860 None => collection_filter,
3861 };
3862 query.where_expr = Some(filter_to_expr(&combined));
3863 query.filter = Some(combined);
3864 }
3865 Ok(SqlCommand::Select(query))
3866 } else if self.consume_ident_ci("STATS")? {
3867 let mut query = TableQuery::new("red.stats");
3868 let collection = match self.peek().clone() {
3869 Token::Ident(name) => {
3870 self.advance()?;
3871 Some(name)
3872 }
3873 Token::String(name) => {
3874 self.advance()?;
3875 Some(name)
3876 }
3877 _ => None,
3878 };
3879 self.parse_table_clauses(&mut query)?;
3880 if let Some(collection) = collection {
3881 let filter = Filter::compare(
3882 FieldRef::column("red.stats", "collection"),
3883 CompareOp::Eq,
3884 Value::text(collection),
3885 );
3886 let expr = filter_to_expr(&filter);
3887 query.where_expr = Some(match query.where_expr.take() {
3888 Some(existing) => Expr::binop(BinOp::And, existing, expr),
3889 None => expr,
3890 });
3891 query.filter = Some(match query.filter.take() {
3892 Some(existing) => existing.and(filter),
3893 None => filter,
3894 });
3895 }
3896 Ok(SqlCommand::Select(query))
3897 } else if self.consume_ident_ci("SAMPLE")? {
3898 let mut query = TableQuery::new(&self.expect_ident()?);
3899 query.limit = if self.consume(&Token::Limit)? {
3900 Some(self.parse_integer()? as u64)
3901 } else {
3902 Some(10)
3903 };
3904 Ok(SqlCommand::Select(query))
3905 } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
3906 let prefix = if !self.check(&Token::Eof) {
3907 Some(Self::normalize_secret_admin_path(
3908 self.parse_dotted_admin_path(true)?,
3909 ))
3910 } else {
3911 None
3912 };
3913 Ok(SqlCommand::ShowSecrets { prefix })
3914 } else if self.consume_ident_ci("TENANT")? {
3915 Ok(SqlCommand::ShowTenant)
3916 } else if let Some(expr) = self.parse_show_iam_after_show()? {
3917 Ok(SqlCommand::IamPolicy(expr))
3918 } else {
3919 Err(ParseError::expected(
3920 vec![
3921 "CONFIG",
3922 "SECRET",
3923 "SECRETS",
3924 "COLLECTIONS",
3925 "TABLES",
3926 "QUEUES",
3927 "VECTORS",
3928 "DOCUMENTS",
3929 "TIMESERIES",
3930 "GRAPHS",
3931 "KV",
3932 "SCHEMA",
3933 "INDICES",
3934 "INDEXES",
3935 "SAMPLE",
3936 "POLICIES",
3937 "STATS",
3938 "TENANT",
3939 "EFFECTIVE",
3940 ],
3941 self.peek(),
3942 self.position(),
3943 ))
3944 }
3945 }
3946 Token::Begin | Token::Start => {
3958 self.advance()?;
3959 let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
3960 if self.consume_ident_ci("ISOLATION")? {
3962 self.expect(Token::Level)?;
3963 let mut parts: Vec<String> = Vec::new();
3967 if self.consume_ident_ci("READ")? {
3968 parts.push("READ".to_string());
3969 if self.consume_ident_ci("UNCOMMITTED")? {
3970 parts.push("UNCOMMITTED".to_string());
3971 } else if self.consume_ident_ci("COMMITTED")? {
3972 parts.push("COMMITTED".to_string());
3973 } else {
3974 return Err(ParseError::expected(
3975 vec!["UNCOMMITTED", "COMMITTED"],
3976 self.peek(),
3977 self.position(),
3978 ));
3979 }
3980 } else if self.consume_ident_ci("REPEATABLE")? {
3981 parts.push("REPEATABLE".to_string());
3982 if !self.consume_ident_ci("READ")? {
3983 return Err(ParseError::expected(
3984 vec!["READ"],
3985 self.peek(),
3986 self.position(),
3987 ));
3988 }
3989 parts.push("READ".to_string());
3990 } else if self.consume_ident_ci("SNAPSHOT")? {
3991 parts.push("SNAPSHOT".to_string());
3992 } else if self.consume_ident_ci("SERIALIZABLE")? {
3993 return Err(ParseError::new(
3994 "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
3995 currently provides SNAPSHOT ISOLATION (which PG calls \
3996 REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
3997 READ COMMITTED, or omit ISOLATION LEVEL for the default."
3998 .to_string(),
3999 self.position(),
4000 ));
4001 } else {
4002 return Err(ParseError::expected(
4003 vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
4004 self.peek(),
4005 self.position(),
4006 ));
4007 }
4008 let _ = parts;
4010 }
4011 Ok(SqlCommand::TransactionControl(TxnControl::Begin))
4012 }
4013 Token::Commit => {
4015 self.advance()?;
4016 let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
4017 Ok(SqlCommand::TransactionControl(TxnControl::Commit))
4018 }
4019 Token::Rollback => {
4022 self.advance()?;
4023 if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
4024 match self.parse_rollback_migration_after_keyword()? {
4025 QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
4026 other => Err(ParseError::new(
4027 format!(
4028 "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
4029 ),
4030 self.position(),
4031 )),
4032 }
4033 } else {
4034 let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
4035 if self.consume(&Token::To)? {
4036 let _ = self.consume(&Token::Savepoint)?;
4037 let name = self.expect_ident()?;
4038 Ok(SqlCommand::TransactionControl(
4039 TxnControl::RollbackToSavepoint(name),
4040 ))
4041 } else {
4042 Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
4043 }
4044 }
4045 }
4046 Token::Savepoint => {
4048 self.advance()?;
4049 let name = self.expect_ident()?;
4050 Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
4051 }
4052 Token::Release => {
4054 self.advance()?;
4055 let _ = self.consume(&Token::Savepoint)?;
4056 let name = self.expect_ident()?;
4057 Ok(SqlCommand::TransactionControl(
4058 TxnControl::ReleaseSavepoint(name),
4059 ))
4060 }
4061 Token::Vacuum => {
4063 self.advance()?;
4064 let full = self.consume(&Token::Full)?;
4065 let target = if self.check(&Token::Eof) {
4066 None
4067 } else {
4068 Some(self.expect_ident()?)
4069 };
4070 Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
4071 target,
4072 full,
4073 }))
4074 }
4075 Token::Refresh => {
4077 self.advance()?;
4078 self.expect(Token::Materialized)?;
4079 self.expect(Token::View)?;
4080 let name = self.expect_ident()?;
4081 Ok(SqlCommand::RefreshMaterializedView(
4082 RefreshMaterializedViewQuery { name },
4083 ))
4084 }
4085 Token::Analyze => {
4087 self.advance()?;
4088 let target = if self.check(&Token::Eof) {
4089 None
4090 } else {
4091 Some(self.expect_ident()?)
4092 };
4093 Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
4094 target,
4095 }))
4096 }
4097 Token::Copy => {
4103 self.advance()?;
4104 let table = self.expect_ident()?;
4105 self.expect(Token::From)?;
4106 let path = self.parse_string()?;
4107
4108 let mut delimiter: Option<char> = None;
4109 let mut has_header = false;
4110 let format = CopyFormat::Csv;
4111
4112 if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
4116 self.expect(Token::LParen)?;
4117 loop {
4118 if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
4119 let _ = self.consume(&Token::Eq)?;
4120 let _ = self.expect_ident()?;
4122 } else if self.consume(&Token::Header)? {
4123 let _ = self.consume(&Token::Eq)?;
4124 has_header = match self.peek().clone() {
4127 Token::True => {
4128 self.advance()?;
4129 true
4130 }
4131 Token::False => {
4132 self.advance()?;
4133 false
4134 }
4135 Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
4136 self.advance()?;
4137 true
4138 }
4139 Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
4140 self.advance()?;
4141 false
4142 }
4143 _ => true,
4144 };
4145 } else if self.consume(&Token::Delimiter)? {
4146 let _ = self.consume(&Token::Eq)?;
4147 let s = self.parse_string()?;
4148 delimiter = s.chars().next();
4149 } else {
4150 break;
4151 }
4152 if !self.consume(&Token::Comma)? {
4153 break;
4154 }
4155 }
4156 self.expect(Token::RParen)?;
4157 }
4158
4159 loop {
4161 if self.consume(&Token::Delimiter)? {
4162 let s = self.parse_string()?;
4163 delimiter = s.chars().next();
4164 } else if self.consume(&Token::Header)? {
4165 has_header = true;
4166 } else {
4167 break;
4168 }
4169 }
4170
4171 Ok(SqlCommand::CopyFrom(CopyFromQuery {
4172 table,
4173 path,
4174 format,
4175 delimiter,
4176 has_header,
4177 }))
4178 }
4179 other => Err(ParseError::expected(
4180 vec![
4181 "SELECT",
4182 "FROM",
4183 "INSERT",
4184 "UPDATE",
4185 "DELETE",
4186 "EXPLAIN",
4187 "CREATE",
4188 "DROP",
4189 "ALTER",
4190 "SET",
4191 "SHOW",
4192 "BEGIN",
4193 "COMMIT",
4194 "ROLLBACK",
4195 "SAVEPOINT",
4196 "RELEASE",
4197 "START",
4198 "VACUUM",
4199 "ANALYZE",
4200 "COPY",
4201 "REFRESH",
4202 "DESCRIBE",
4203 "DESC",
4204 ],
4205 other,
4206 self.position(),
4207 )),
4208 }
4209 }
4210}