1use crate::catalog::CollectionModel;
2use crate::storage::query::ast::{
3 AlterMetricQuery, AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery,
4 AskQuery, BinOp, CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
5 CreateForeignTableQuery, CreateIndexQuery, CreateMetricQuery, CreateMigrationQuery,
6 CreatePolicyQuery, CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery,
7 CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery, CreateVectorQuery,
8 CreateViewQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery, DropForeignTableQuery,
9 DropGraphQuery, DropIndexQuery, DropKvQuery, DropPolicyQuery, DropQueueQuery, DropSchemaQuery,
10 DropSequenceQuery, DropServerQuery, DropTableQuery, DropTimeSeriesQuery, DropTreeQuery,
11 DropVectorQuery, DropViewQuery, EventsBackfillQuery, ExplainAlterQuery, ExplainMigrationQuery,
12 Expr, FieldRef, Filter, ForeignColumnDef, GrantStmt, GraphCommand, GraphQuery, HybridQuery,
13 InsertQuery, JoinQuery, KvCommand, MaintenanceCommand, PathQuery, PolicyAction,
14 ProbabilisticCommand, QueryExpr, QueueCommand, QueueSelectQuery, RankOfQuery, RankRangeQuery,
15 RefreshMaterializedViewQuery, RevokeStmt, RollbackMigrationQuery, SearchCommand, Span,
16 TableQuery, TreeCommand, TruncateQuery, TxnControl, UpdateQuery, VectorQuery,
17};
18use crate::storage::query::parser::{ParseError, Parser, SafeTokenDisplay};
19use crate::storage::query::sql_lowering::filter_to_expr;
20use crate::storage::query::Token;
21use crate::storage::schema::Value;
22
23#[derive(Debug, Clone)]
28pub enum SqlStatement {
29 Query(SqlQuery),
30 Mutation(SqlMutation),
31 Schema(SqlSchemaCommand),
32 Admin(SqlAdminCommand),
33}
34
35#[derive(Debug, Clone)]
36#[allow(clippy::large_enum_variant)]
37pub enum FrontendStatement {
38 Sql(SqlStatement),
39 Graph(GraphQuery),
40 GraphCommand(GraphCommand),
41 Path(PathQuery),
42 Vector(VectorQuery),
43 Hybrid(HybridQuery),
44 Search(SearchCommand),
45 Ask(AskQuery),
46 QueueSelect(QueueSelectQuery),
47 QueueCommand(QueueCommand),
48 EventsBackfill(EventsBackfillQuery),
49 EventsBackfillStatus { collection: String },
50 TreeCommand(TreeCommand),
51 ProbabilisticCommand(ProbabilisticCommand),
52 KvCommand(KvCommand),
53 ConfigCommand(ConfigCommand),
54 Ranking(QueryExpr),
55}
56
57#[derive(Debug, Clone)]
58pub enum SqlCommand {
59 Select(TableQuery),
60 Join(JoinQuery),
61 Insert(InsertQuery),
62 Update(UpdateQuery),
63 Delete(DeleteQuery),
64 ExplainAlter(ExplainAlterQuery),
65 CreateTable(CreateTableQuery),
66 CreateCollection(CreateCollectionQuery),
67 CreateVector(CreateVectorQuery),
68 DropTable(DropTableQuery),
69 DropGraph(DropGraphQuery),
70 DropVector(DropVectorQuery),
71 DropDocument(DropDocumentQuery),
72 DropKv(DropKvQuery),
73 DropCollection(DropCollectionQuery),
74 Truncate(TruncateQuery),
75 AlterTable(AlterTableQuery),
76 CreateIndex(CreateIndexQuery),
77 DropIndex(DropIndexQuery),
78 CreateTimeSeries(CreateTimeSeriesQuery),
79 CreateMetric(CreateMetricQuery),
80 AlterMetric(AlterMetricQuery),
81 CreateSlo(CreateSloQuery),
82 DropTimeSeries(DropTimeSeriesQuery),
83 CreateQueue(CreateQueueQuery),
84 AlterQueue(AlterQueueQuery),
85 DropQueue(DropQueueQuery),
86 CreateTree(CreateTreeQuery),
87 DropTree(DropTreeQuery),
88 Probabilistic(ProbabilisticCommand),
89 SetConfig {
90 key: String,
91 value: Value,
92 },
93 ShowConfig {
94 prefix: Option<String>,
95 },
96 SetSecret {
97 key: String,
98 value: Value,
99 },
100 DeleteSecret {
101 key: String,
102 },
103 ShowSecrets {
104 prefix: Option<String>,
105 },
106 SetTenant(Option<String>),
107 ShowTenant,
108 TransactionControl(TxnControl),
109 Maintenance(MaintenanceCommand),
110 CreateSchema(CreateSchemaQuery),
111 DropSchema(DropSchemaQuery),
112 CreateSequence(CreateSequenceQuery),
113 DropSequence(DropSequenceQuery),
114 CopyFrom(CopyFromQuery),
115 CreateView(CreateViewQuery),
116 DropView(DropViewQuery),
117 RefreshMaterializedView(RefreshMaterializedViewQuery),
118 CreatePolicy(CreatePolicyQuery),
119 DropPolicy(DropPolicyQuery),
120 CreateServer(CreateServerQuery),
121 DropServer(DropServerQuery),
122 CreateForeignTable(CreateForeignTableQuery),
123 DropForeignTable(DropForeignTableQuery),
124 Grant(GrantStmt),
126 Revoke(RevokeStmt),
128 AlterUser(AlterUserStmt),
130 IamPolicy(QueryExpr),
135 CreateMigration(CreateMigrationQuery),
136 ApplyMigration(ApplyMigrationQuery),
137 RollbackMigration(RollbackMigrationQuery),
138 ExplainMigration(ExplainMigrationQuery),
139}
140
141fn analytics_v0_non_goal_create(token: &Token) -> Option<String> {
150 let ident = match token {
151 Token::Ident(s) => s,
152 _ => return None,
153 };
154 let upper = ident.to_ascii_uppercase();
155 let message = match upper.as_str() {
156 "ANALYTICS" => {
157 "CREATE ANALYTICS is not supported in Analytics v0 — \
158 use CREATE METRIC <dotted.path> for the metric-centric \
159 catalog (PRD #782 non-goal)"
160 }
161 "EVENT" => {
162 "CREATE EVENT is not supported in Analytics v0 — \
163 event-shaped data lives in ordinary TABLE/DOCUMENT \
164 collections, not a new storage model (PRD #782 non-goal)"
165 }
166 "COHORT" => {
167 "CREATE COHORT is not supported in Analytics v0 — \
168 cohort surfaces are deferred (PRD #782 non-goal)"
169 }
170 "FUNNEL" => {
171 "CREATE FUNNEL is not supported in Analytics v0 — \
172 funnel surfaces are deferred (PRD #782 non-goal)"
173 }
174 "SLA" => {
175 "CREATE SLA is not supported in Analytics v0 — \
176 SLA/legal/commercial contract modeling is post-MVP \
177 (PRD #782 non-goal)"
178 }
179 "ADAPTER" => {
180 "CREATE ADAPTER is not supported in Analytics v0 — \
181 Prometheus/Grafana/Snowplow/Google Analytics adapters \
182 are deferred (PRD #782 non-goal)"
183 }
184 _ => return None,
185 };
186 Some(message.to_string())
187}
188
189fn collection_model_filter(model: &str) -> Filter {
190 Filter::Compare {
191 field: FieldRef::column("", "model"),
192 op: CompareOp::Eq,
193 value: Value::Text(model.to_string().into()),
194 }
195}
196
197fn add_table_filter(query: &mut TableQuery, filter: Filter) {
198 let combined = match query.filter.take() {
199 Some(existing) => existing.and(filter),
200 None => filter,
201 };
202 query.where_expr = Some(filter_to_expr(&combined));
203 query.filter = Some(combined);
204}
205
206fn parse_show_collections_by_model(
207 parser: &mut Parser<'_>,
208 model: &str,
209) -> Result<TableQuery, ParseError> {
210 let mut query = TableQuery::new("red.collections");
211 parser.parse_table_clauses(&mut query)?;
212 add_table_filter(&mut query, collection_model_filter(model));
213 Ok(query)
214}
215
216#[derive(Debug, Clone)]
217#[allow(clippy::large_enum_variant)]
218pub enum SqlQuery {
219 Select(TableQuery),
220 Join(JoinQuery),
221}
222
223#[derive(Debug, Clone)]
224pub enum SqlMutation {
225 Insert(InsertQuery),
226 Update(UpdateQuery),
227 Delete(DeleteQuery),
228}
229
230#[derive(Debug, Clone)]
231pub enum SqlSchemaCommand {
232 ExplainAlter(ExplainAlterQuery),
233 CreateTable(CreateTableQuery),
234 CreateCollection(CreateCollectionQuery),
235 CreateVector(CreateVectorQuery),
236 DropTable(DropTableQuery),
237 DropGraph(DropGraphQuery),
238 DropVector(DropVectorQuery),
239 DropDocument(DropDocumentQuery),
240 DropKv(DropKvQuery),
241 DropCollection(DropCollectionQuery),
242 Truncate(TruncateQuery),
243 AlterTable(AlterTableQuery),
244 CreateIndex(CreateIndexQuery),
245 DropIndex(DropIndexQuery),
246 CreateTimeSeries(CreateTimeSeriesQuery),
247 CreateMetric(CreateMetricQuery),
248 AlterMetric(AlterMetricQuery),
249 CreateSlo(CreateSloQuery),
250 DropTimeSeries(DropTimeSeriesQuery),
251 CreateQueue(CreateQueueQuery),
252 AlterQueue(AlterQueueQuery),
253 DropQueue(DropQueueQuery),
254 CreateTree(CreateTreeQuery),
255 DropTree(DropTreeQuery),
256 Probabilistic(ProbabilisticCommand),
257 CreateSchema(CreateSchemaQuery),
258 DropSchema(DropSchemaQuery),
259 CreateSequence(CreateSequenceQuery),
260 DropSequence(DropSequenceQuery),
261 CopyFrom(CopyFromQuery),
262 CreateView(CreateViewQuery),
263 DropView(DropViewQuery),
264 RefreshMaterializedView(RefreshMaterializedViewQuery),
265 CreatePolicy(CreatePolicyQuery),
266 DropPolicy(DropPolicyQuery),
267 CreateServer(CreateServerQuery),
268 DropServer(DropServerQuery),
269 CreateForeignTable(CreateForeignTableQuery),
270 DropForeignTable(DropForeignTableQuery),
271 CreateMigration(CreateMigrationQuery),
272 ApplyMigration(ApplyMigrationQuery),
273 RollbackMigration(RollbackMigrationQuery),
274 ExplainMigration(ExplainMigrationQuery),
275}
276
277#[derive(Debug, Clone)]
278#[allow(clippy::large_enum_variant)]
279pub enum SqlAdminCommand {
280 SetConfig { key: String, value: Value },
281 ShowConfig { prefix: Option<String> },
282 SetSecret { key: String, value: Value },
283 DeleteSecret { key: String },
284 ShowSecrets { prefix: Option<String> },
285 SetTenant(Option<String>),
286 ShowTenant,
287 TransactionControl(TxnControl),
288 Maintenance(MaintenanceCommand),
289 Grant(GrantStmt),
290 Revoke(RevokeStmt),
291 AlterUser(AlterUserStmt),
292 IamPolicy(QueryExpr),
293}
294
295impl SqlStatement {
296 pub fn into_command(self) -> SqlCommand {
297 match self {
298 SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
299 SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
300 SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
301 SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
302 SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
303 SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
304 SqlCommand::ExplainAlter(query)
305 }
306 SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
307 SqlCommand::CreateTable(query)
308 }
309 SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
310 SqlCommand::CreateCollection(query)
311 }
312 SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
313 SqlCommand::CreateVector(query)
314 }
315 SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
316 SqlCommand::DropTable(query)
317 }
318 SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
319 SqlCommand::DropGraph(query)
320 }
321 SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
322 SqlCommand::DropVector(query)
323 }
324 SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
325 SqlCommand::DropDocument(query)
326 }
327 SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
328 SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
329 SqlCommand::DropCollection(query)
330 }
331 SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
332 SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
333 SqlCommand::AlterTable(query)
334 }
335 SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
336 SqlCommand::CreateIndex(query)
337 }
338 SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
339 SqlCommand::DropIndex(query)
340 }
341 SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
342 SqlCommand::CreateTimeSeries(query)
343 }
344 SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query)) => {
345 SqlCommand::CreateMetric(query)
346 }
347 SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query)) => {
348 SqlCommand::AlterMetric(query)
349 }
350 SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query)) => {
351 SqlCommand::CreateSlo(query)
352 }
353 SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
354 SqlCommand::DropTimeSeries(query)
355 }
356 SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
357 SqlCommand::CreateQueue(query)
358 }
359 SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
360 SqlCommand::AlterQueue(query)
361 }
362 SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
363 SqlCommand::DropQueue(query)
364 }
365 SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
366 SqlCommand::CreateTree(query)
367 }
368 SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
369 SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
370 SqlCommand::Probabilistic(command)
371 }
372 SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
373 SqlCommand::SetConfig { key, value }
374 }
375 SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix }) => {
376 SqlCommand::ShowConfig { prefix }
377 }
378 SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
379 SqlCommand::SetSecret { key, value }
380 }
381 SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
382 SqlCommand::DeleteSecret { key }
383 }
384 SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
385 SqlCommand::ShowSecrets { prefix }
386 }
387 SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
388 SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
389 SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
390 SqlCommand::TransactionControl(ctl)
391 }
392 SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
393 SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
394 SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
395 SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
396 SqlCommand::CreateSequence(q)
397 }
398 SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
399 SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
400 SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
401 SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
402 SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
403 SqlCommand::RefreshMaterializedView(q)
404 }
405 SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
406 SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
407 SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
408 SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
409 SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
410 SqlCommand::CreateForeignTable(q)
411 }
412 SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
413 SqlCommand::DropForeignTable(q)
414 }
415 SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
416 SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
417 SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
418 SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
419 SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
420 SqlCommand::CreateMigration(q)
421 }
422 SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
423 SqlCommand::ApplyMigration(q)
424 }
425 SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
426 SqlCommand::RollbackMigration(q)
427 }
428 SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
429 SqlCommand::ExplainMigration(q)
430 }
431 }
432 }
433
434 pub fn into_query_expr(self) -> QueryExpr {
435 self.into_command().into_query_expr()
436 }
437}
438
439impl FrontendStatement {
440 pub fn into_query_expr(self) -> QueryExpr {
441 match self {
442 FrontendStatement::Sql(statement) => statement.into_query_expr(),
443 FrontendStatement::Graph(query) => QueryExpr::Graph(query),
444 FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
445 FrontendStatement::Path(query) => QueryExpr::Path(query),
446 FrontendStatement::Vector(query) => QueryExpr::Vector(query),
447 FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
448 FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
449 FrontendStatement::Ask(query) => QueryExpr::Ask(query),
450 FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
451 FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
452 FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
453 FrontendStatement::EventsBackfillStatus { collection } => {
454 QueryExpr::EventsBackfillStatus { collection }
455 }
456 FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
457 FrontendStatement::ProbabilisticCommand(command) => {
458 QueryExpr::ProbabilisticCommand(command)
459 }
460 FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
461 FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
462 FrontendStatement::Ranking(expr) => expr,
463 }
464 }
465}
466
467pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
468 let mut parser = Parser::new(input)?;
469 let statement = parser.parse_frontend_statement()?;
470 if !parser.check(&Token::Eof) {
471 return Err(ParseError::new(
472 format!("Unexpected token after query: {:?}", parser.current.token),
477 parser.position(),
478 ));
479 }
480 Ok(statement)
481}
482
483impl SqlCommand {
484 pub fn into_query_expr(self) -> QueryExpr {
485 match self {
486 SqlCommand::Select(query) => QueryExpr::Table(query),
487 SqlCommand::Join(query) => QueryExpr::Join(query),
488 SqlCommand::Insert(query) => QueryExpr::Insert(query),
489 SqlCommand::Update(query) => QueryExpr::Update(query),
490 SqlCommand::Delete(query) => QueryExpr::Delete(query),
491 SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
492 SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
493 SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
494 SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
495 SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
496 SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
497 SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
498 SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
499 SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
500 SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
501 SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
502 SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
503 SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
504 SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
505 SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
506 SqlCommand::CreateMetric(query) => QueryExpr::CreateMetric(query),
507 SqlCommand::AlterMetric(query) => QueryExpr::AlterMetric(query),
508 SqlCommand::CreateSlo(query) => QueryExpr::CreateSlo(query),
509 SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
510 SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
511 SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
512 SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
513 SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
514 SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
515 SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
516 SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
517 SqlCommand::ShowConfig { prefix } => QueryExpr::ShowConfig { prefix },
518 SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
519 SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
520 SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
521 SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
522 SqlCommand::ShowTenant => QueryExpr::ShowTenant,
523 SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
524 SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
525 SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
526 SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
527 SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
528 SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
529 SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
530 SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
531 SqlCommand::DropView(q) => QueryExpr::DropView(q),
532 SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
533 SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
534 SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
535 SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
536 SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
537 SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
538 SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
539 SqlCommand::Grant(s) => QueryExpr::Grant(s),
540 SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
541 SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
542 SqlCommand::IamPolicy(e) => e,
543 SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
544 SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
545 SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
546 SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
547 }
548 }
549
550 pub fn into_statement(self) -> SqlStatement {
551 match self {
552 SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
553 SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
554 SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
555 SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
556 SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
557 SqlCommand::ExplainAlter(query) => {
558 SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
559 }
560 SqlCommand::CreateTable(query) => {
561 SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
562 }
563 SqlCommand::CreateCollection(query) => {
564 SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
565 }
566 SqlCommand::CreateVector(query) => {
567 SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
568 }
569 SqlCommand::DropTable(query) => {
570 SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
571 }
572 SqlCommand::DropGraph(query) => {
573 SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
574 }
575 SqlCommand::DropVector(query) => {
576 SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
577 }
578 SqlCommand::DropDocument(query) => {
579 SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
580 }
581 SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
582 SqlCommand::DropCollection(query) => {
583 SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
584 }
585 SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
586 SqlCommand::AlterTable(query) => {
587 SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
588 }
589 SqlCommand::CreateIndex(query) => {
590 SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
591 }
592 SqlCommand::DropIndex(query) => {
593 SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
594 }
595 SqlCommand::CreateTimeSeries(query) => {
596 SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
597 }
598 SqlCommand::CreateMetric(query) => {
599 SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query))
600 }
601 SqlCommand::AlterMetric(query) => {
602 SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query))
603 }
604 SqlCommand::CreateSlo(query) => {
605 SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query))
606 }
607 SqlCommand::DropTimeSeries(query) => {
608 SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
609 }
610 SqlCommand::CreateQueue(query) => {
611 SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
612 }
613 SqlCommand::AlterQueue(query) => {
614 SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
615 }
616 SqlCommand::DropQueue(query) => {
617 SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
618 }
619 SqlCommand::CreateTree(query) => {
620 SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
621 }
622 SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
623 SqlCommand::Probabilistic(command) => {
624 SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
625 }
626 SqlCommand::SetConfig { key, value } => {
627 SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
628 }
629 SqlCommand::ShowConfig { prefix } => {
630 SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix })
631 }
632 SqlCommand::SetSecret { key, value } => {
633 SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
634 }
635 SqlCommand::DeleteSecret { key } => {
636 SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
637 }
638 SqlCommand::ShowSecrets { prefix } => {
639 SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
640 }
641 SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
642 SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
643 SqlCommand::TransactionControl(ctl) => {
644 SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
645 }
646 SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
647 SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
648 SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
649 SqlCommand::CreateSequence(q) => {
650 SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
651 }
652 SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
653 SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
654 SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
655 SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
656 SqlCommand::RefreshMaterializedView(q) => {
657 SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
658 }
659 SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
660 SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
661 SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
662 SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
663 SqlCommand::CreateForeignTable(q) => {
664 SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
665 }
666 SqlCommand::DropForeignTable(q) => {
667 SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
668 }
669 SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
670 SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
671 SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
672 SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
673 SqlCommand::CreateMigration(q) => {
674 SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
675 }
676 SqlCommand::ApplyMigration(q) => {
677 SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
678 }
679 SqlCommand::RollbackMigration(q) => {
680 SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
681 }
682 SqlCommand::ExplainMigration(q) => {
683 SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
684 }
685 }
686 }
687}
688
689impl<'a> Parser<'a> {
690 fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
691 self.expect_ident()?; if self.consume_ident_ci("STATUS")? {
693 let mut query = TableQuery::new("red.subscriptions");
694 let collection = match self.peek().clone() {
695 Token::Ident(name) => {
696 self.advance()?;
697 Some(name)
698 }
699 Token::String(name) => {
700 self.advance()?;
701 Some(name)
702 }
703 _ => None,
704 };
705 self.parse_table_clauses(&mut query)?;
706 if let Some(collection) = collection {
707 let filter = Filter::compare(
708 FieldRef::column("red.subscriptions", "collection"),
709 CompareOp::Eq,
710 Value::text(collection),
711 );
712 let expr = filter_to_expr(&filter);
713 query.where_expr = Some(match query.where_expr.take() {
714 Some(existing) => Expr::binop(BinOp::And, existing, expr),
715 None => expr,
716 });
717 query.filter = Some(match query.filter.take() {
718 Some(existing) => existing.and(filter),
719 None => filter,
720 });
721 }
722 return Ok(QueryExpr::Table(query));
723 }
724
725 if !self.consume_ident_ci("BACKFILL")? {
726 return Err(ParseError::expected(
727 vec!["BACKFILL", "STATUS"],
728 self.peek(),
729 self.position(),
730 ));
731 }
732
733 if self.consume_ident_ci("STATUS")? {
734 let collection = self.expect_ident()?;
735 return Ok(QueryExpr::EventsBackfillStatus { collection });
736 }
737
738 let collection = self.expect_ident()?;
739 let where_filter = if self.consume(&Token::Where)? {
740 let mut parts = Vec::new();
741 while !self.check(&Token::Eof) && !self.check(&Token::To) {
742 parts.push(self.peek().to_string());
743 self.advance()?;
744 }
745 if parts.is_empty() {
746 return Err(ParseError::expected(
747 vec!["predicate"],
748 self.peek(),
749 self.position(),
750 ));
751 }
752 Some(parts.join(" "))
753 } else {
754 None
755 };
756
757 self.expect(Token::To)?;
758 let target_queue = self.expect_ident()?;
759 let limit = if self.consume(&Token::Limit)? {
760 Some(self.parse_positive_integer("LIMIT")? as u64)
761 } else {
762 None
763 };
764
765 Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
766 collection,
767 where_filter,
768 target_queue,
769 limit,
770 }))
771 }
772
773 pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
778 if !self.consume(&Token::Options)? {
779 return Ok(Vec::new());
780 }
781 self.expect(Token::LParen)?;
782 let mut out: Vec<(String, String)> = Vec::new();
783 loop {
784 let was_ident = matches!(self.peek(), Token::Ident(_));
789 let raw = self.expect_ident_or_keyword()?;
790 let key = if was_ident {
791 raw
792 } else {
793 raw.to_ascii_lowercase()
794 };
795 let value = self.parse_string()?;
797 out.push((key, value));
798 if !self.consume(&Token::Comma)? {
799 break;
800 }
801 }
802 self.expect(Token::RParen)?;
803 Ok(out)
804 }
805
806 pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
808 match self.peek() {
809 Token::Select => match self.parse_select_query()? {
810 QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
811 SqlQuery::Select(query),
812 ))),
813 QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
814 SqlQuery::Join(query),
815 ))),
816 QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
817 other => Err(ParseError::new(
818 format!("internal: SELECT produced unexpected query kind {other:?}"),
819 self.position(),
820 )),
821 },
822 Token::From
823 | Token::Insert
824 | Token::Update
825 | Token::Truncate
826 | Token::Create
827 | Token::Drop
828 | Token::Alter
829 | Token::Set
830 | Token::Begin
831 | Token::Commit
832 | Token::Rollback
833 | Token::Savepoint
834 | Token::Release
835 | Token::Start
836 | Token::Vacuum
837 | Token::Analyze
838 | Token::Copy
839 | Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
840 Token::Explain => {
841 if matches!(
842 self.peek_next()?,
843 Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
844 ) {
845 match self.parse_explain_ask_query()? {
846 QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
847 other => Err(ParseError::new(
848 format!(
849 "internal: EXPLAIN ASK produced unexpected query kind {other:?}"
850 ),
851 self.position(),
852 )),
853 }
854 } else {
855 self.parse_sql_statement().map(FrontendStatement::Sql)
856 }
857 }
858 Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
859 self.parse_sql_statement().map(FrontendStatement::Sql)
860 }
861 Token::Ident(name)
862 if name.eq_ignore_ascii_case("RANK")
863 || name.eq_ignore_ascii_case("APPROX")
864 || name.eq_ignore_ascii_case("APPROXIMATE")
865 || name.eq_ignore_ascii_case("ZRANK")
866 || name.eq_ignore_ascii_case("ZRANGE") =>
867 {
868 self.parse_ranking_read().map(FrontendStatement::Ranking)
869 }
870 Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
871 Token::Ident(name)
872 if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
873 {
874 self.parse_sql_statement().map(FrontendStatement::Sql)
875 }
876 Token::Ident(name)
877 if name.eq_ignore_ascii_case("GRANT")
878 || name.eq_ignore_ascii_case("REVOKE")
879 || name.eq_ignore_ascii_case("SIMULATE")
880 || name.eq_ignore_ascii_case("LINT")
881 || name.eq_ignore_ascii_case("MIGRATE")
882 || name.eq_ignore_ascii_case("APPLY") =>
883 {
884 self.parse_sql_statement().map(FrontendStatement::Sql)
885 }
886 Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
887 self.advance()?;
888 if matches!(
889 self.peek(),
890 Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
891 ) {
892 match self.parse_config_watch_after_watch()? {
893 QueryExpr::ConfigCommand(command) => {
894 Ok(FrontendStatement::ConfigCommand(command))
895 }
896 other => Err(ParseError::new(
897 format!(
898 "internal: WATCH CONFIG produced unexpected query kind {other:?}"
899 ),
900 self.position(),
901 )),
902 }
903 } else if matches!(
904 self.peek(),
905 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
906 ) {
907 match self.parse_vault_watch_after_watch()? {
908 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
909 other => Err(ParseError::new(
910 format!(
911 "internal: WATCH VAULT produced unexpected query kind {other:?}"
912 ),
913 self.position(),
914 )),
915 }
916 } else {
917 match self.parse_kv_watch(crate::catalog::CollectionModel::Kv)? {
918 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
919 other => Err(ParseError::new(
920 format!("internal: WATCH produced unexpected query kind {other:?}"),
921 self.position(),
922 )),
923 }
924 }
925 }
926 Token::List => {
927 self.advance()?;
928 if matches!(
929 self.peek(),
930 Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
931 ) {
932 match self.parse_config_list_after_list()? {
933 QueryExpr::ConfigCommand(command) => {
934 Ok(FrontendStatement::ConfigCommand(command))
935 }
936 other => Err(ParseError::new(
937 format!(
938 "internal: LIST CONFIG produced unexpected query kind {other:?}"
939 ),
940 self.position(),
941 )),
942 }
943 } else if matches!(
944 self.peek(),
945 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
946 ) {
947 match self.parse_vault_list_after_list()? {
948 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
949 other => Err(ParseError::new(
950 format!(
951 "internal: LIST VAULT produced unexpected query kind {other:?}"
952 ),
953 self.position(),
954 )),
955 }
956 } else {
957 Err(ParseError::expected(
958 vec!["CONFIG", "VAULT"],
959 self.peek(),
960 self.position(),
961 ))
962 }
963 }
964 Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
965 self.advance()?;
966 if matches!(
967 self.peek(),
968 Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
969 ) {
970 match self.parse_config_list_after_list()? {
971 QueryExpr::ConfigCommand(command) => {
972 Ok(FrontendStatement::ConfigCommand(command))
973 }
974 other => Err(ParseError::new(
975 format!(
976 "internal: LIST CONFIG produced unexpected query kind {other:?}"
977 ),
978 self.position(),
979 )),
980 }
981 } else if matches!(
982 self.peek(),
983 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
984 ) {
985 match self.parse_vault_list_after_list()? {
986 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
987 other => Err(ParseError::new(
988 format!(
989 "internal: LIST VAULT produced unexpected query kind {other:?}"
990 ),
991 self.position(),
992 )),
993 }
994 } else {
995 Err(ParseError::expected(
996 vec!["CONFIG", "VAULT"],
997 self.peek(),
998 self.position(),
999 ))
1000 }
1001 }
1002 Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
1003 if matches!(
1004 self.peek_next()?,
1005 Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
1006 ) {
1007 match self.parse_config_command()? {
1008 QueryExpr::ConfigCommand(command) => {
1009 Ok(FrontendStatement::ConfigCommand(command))
1010 }
1011 other => Err(ParseError::new(
1012 format!("internal: CONFIG produced unexpected query kind {other:?}"),
1013 self.position(),
1014 )),
1015 }
1016 } else {
1017 self.advance()?;
1018 match self.parse_kv_invalidate_tags_after_invalidate()? {
1019 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1020 other => Err(ParseError::new(
1021 format!(
1022 "internal: INVALIDATE produced unexpected query kind {other:?}"
1023 ),
1024 self.position(),
1025 )),
1026 }
1027 }
1028 }
1029 Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
1030 Token::Match => match self.parse_match_query()? {
1031 QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
1032 other => Err(ParseError::new(
1033 format!("internal: MATCH produced unexpected query kind {other:?}"),
1034 self.position(),
1035 )),
1036 },
1037 Token::Path => match self.parse_path_query()? {
1038 QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
1039 other => Err(ParseError::new(
1040 format!("internal: PATH produced unexpected query kind {other:?}"),
1041 self.position(),
1042 )),
1043 },
1044 Token::Vector => match self.parse_vector_query()? {
1045 QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
1046 other => Err(ParseError::new(
1047 format!("internal: VECTOR produced unexpected query kind {other:?}"),
1048 self.position(),
1049 )),
1050 },
1051 Token::Hybrid => match self.parse_hybrid_query()? {
1052 QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
1053 other => Err(ParseError::new(
1054 format!("internal: HYBRID produced unexpected query kind {other:?}"),
1055 self.position(),
1056 )),
1057 },
1058 Token::Graph => match self.parse_graph_command()? {
1059 QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
1060 other => Err(ParseError::new(
1061 format!("internal: GRAPH produced unexpected query kind {other:?}"),
1062 self.position(),
1063 )),
1064 },
1065 Token::Search => match self.parse_search_command()? {
1066 QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
1067 other => Err(ParseError::new(
1068 format!("internal: SEARCH produced unexpected query kind {other:?}"),
1069 self.position(),
1070 )),
1071 },
1072 Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
1073 match self.parse_ask_query()? {
1074 QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
1075 other => Err(ParseError::new(
1076 format!("internal: ASK produced unexpected query kind {other:?}"),
1077 self.position(),
1078 )),
1079 }
1080 }
1081 Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
1082 match self.parse_unseal_vault_command()? {
1083 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1084 other => Err(ParseError::new(
1085 format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
1086 self.position(),
1087 )),
1088 }
1089 }
1090 Token::Queue => match self.parse_queue_command()? {
1091 QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
1092 other => Err(ParseError::new(
1093 format!("internal: QUEUE produced unexpected query kind {other:?}"),
1094 self.position(),
1095 )),
1096 },
1097 Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
1098 match self.parse_events_command()? {
1099 QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
1100 SqlQuery::Select(query),
1101 ))),
1102 QueryExpr::EventsBackfill(query) => {
1103 Ok(FrontendStatement::EventsBackfill(query))
1104 }
1105 QueryExpr::EventsBackfillStatus { collection } => {
1106 Ok(FrontendStatement::EventsBackfillStatus { collection })
1107 }
1108 other => Err(ParseError::new(
1109 format!("internal: EVENTS produced unexpected query kind {other:?}"),
1110 self.position(),
1111 )),
1112 }
1113 }
1114 Token::Kv => match self.parse_kv_command()? {
1115 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1116 other => Err(ParseError::new(
1117 format!("internal: KV produced unexpected query kind {other:?}"),
1118 self.position(),
1119 )),
1120 },
1121 Token::Delete => {
1122 if matches!(
1123 self.peek_next()?,
1124 Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
1125 ) {
1126 match self.parse_config_command()? {
1127 QueryExpr::ConfigCommand(command) => {
1128 Ok(FrontendStatement::ConfigCommand(command))
1129 }
1130 other => Err(ParseError::new(
1131 format!("internal: CONFIG produced unexpected query kind {other:?}"),
1132 self.position(),
1133 )),
1134 }
1135 } else if matches!(
1136 self.peek_next()?,
1137 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
1138 ) {
1139 match self.parse_vault_lifecycle_command()? {
1140 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1141 other => Err(ParseError::new(
1142 format!("internal: VAULT produced unexpected query kind {other:?}"),
1143 self.position(),
1144 )),
1145 }
1146 } else {
1147 self.parse_sql_statement().map(FrontendStatement::Sql)
1148 }
1149 }
1150 Token::Add => match self.parse_config_command()? {
1151 QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
1152 other => Err(ParseError::new(
1153 format!("internal: CONFIG produced unexpected query kind {other:?}"),
1154 self.position(),
1155 )),
1156 },
1157 Token::Purge => match self.parse_vault_lifecycle_command()? {
1158 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1159 other => Err(ParseError::new(
1160 format!("internal: VAULT produced unexpected query kind {other:?}"),
1161 self.position(),
1162 )),
1163 },
1164 Token::Ident(name)
1165 if name.eq_ignore_ascii_case("PUT")
1166 || name.eq_ignore_ascii_case("GET")
1167 || name.eq_ignore_ascii_case("RESOLVE")
1168 || name.eq_ignore_ascii_case("ROTATE")
1169 || name.eq_ignore_ascii_case("HISTORY")
1170 || name.eq_ignore_ascii_case("PURGE")
1171 || name.eq_ignore_ascii_case("INCR")
1172 || name.eq_ignore_ascii_case("DECR")
1173 || name.eq_ignore_ascii_case("INVALIDATE") =>
1174 {
1175 if matches!(
1176 self.peek_next()?,
1177 Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
1178 ) {
1179 match self.parse_vault_lifecycle_command()? {
1180 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1181 other => Err(ParseError::new(
1182 format!("internal: VAULT produced unexpected query kind {other:?}"),
1183 self.position(),
1184 )),
1185 }
1186 } else {
1187 match self.parse_config_command()? {
1188 QueryExpr::ConfigCommand(command) => {
1189 Ok(FrontendStatement::ConfigCommand(command))
1190 }
1191 other => Err(ParseError::new(
1192 format!("internal: CONFIG produced unexpected query kind {other:?}"),
1193 self.position(),
1194 )),
1195 }
1196 }
1197 }
1198 Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
1199 match self.parse_vault_command()? {
1200 QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
1201 other => Err(ParseError::new(
1202 format!("internal: VAULT produced unexpected query kind {other:?}"),
1203 self.position(),
1204 )),
1205 }
1206 }
1207 Token::Tree => match self.parse_tree_command()? {
1208 QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
1209 other => Err(ParseError::new(
1210 format!("internal: TREE produced unexpected query kind {other:?}"),
1211 self.position(),
1212 )),
1213 },
1214 Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
1215 match self.parse_hll_command()? {
1216 QueryExpr::ProbabilisticCommand(command) => {
1217 Ok(FrontendStatement::ProbabilisticCommand(command))
1218 }
1219 other => Err(ParseError::new(
1220 format!("internal: HLL produced unexpected query kind {other:?}"),
1221 self.position(),
1222 )),
1223 }
1224 }
1225 Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
1226 match self.parse_sketch_command()? {
1227 QueryExpr::ProbabilisticCommand(command) => {
1228 Ok(FrontendStatement::ProbabilisticCommand(command))
1229 }
1230 other => Err(ParseError::new(
1231 format!("internal: SKETCH produced unexpected query kind {other:?}"),
1232 self.position(),
1233 )),
1234 }
1235 }
1236 Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
1237 match self.parse_filter_command()? {
1238 QueryExpr::ProbabilisticCommand(command) => {
1239 Ok(FrontendStatement::ProbabilisticCommand(command))
1240 }
1241 other => Err(ParseError::new(
1242 format!("internal: FILTER produced unexpected query kind {other:?}"),
1243 self.position(),
1244 )),
1245 }
1246 }
1247 Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
1248 .parse_sql_command()
1249 .map(SqlCommand::into_statement)
1250 .map(FrontendStatement::Sql),
1251 other => Err(ParseError::expected(
1252 vec![
1253 "SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
1254 "DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
1255 "QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
1256 "DESCRIBE", "DESC", "RANK", "ZRANK", "ZRANGE",
1257 ],
1258 other,
1259 self.position(),
1260 )),
1261 }
1262 }
1263
1264 fn parse_ranking_read(&mut self) -> Result<QueryExpr, ParseError> {
1265 let head = self.expect_ident()?;
1266 if head.eq_ignore_ascii_case("RANK") {
1267 return self.parse_rank_after_rank(false);
1268 }
1269 if head.eq_ignore_ascii_case("APPROX") || head.eq_ignore_ascii_case("APPROXIMATE") {
1270 if !self.consume_ident_ci("RANK")? {
1271 return Err(ParseError::expected(
1272 vec!["RANK"],
1273 self.peek(),
1274 self.position(),
1275 ));
1276 }
1277 return self.parse_rank_after_rank(true);
1278 }
1279 if head.eq_ignore_ascii_case("ZRANK") {
1280 return self.parse_zrank();
1281 }
1282 if head.eq_ignore_ascii_case("ZRANGE") {
1283 return self.parse_zrange();
1284 }
1285 Err(ParseError::expected(
1286 vec!["RANK", "APPROX RANK", "ZRANK", "ZRANGE"],
1287 self.peek(),
1288 self.position(),
1289 ))
1290 }
1291
1292 fn parse_rank_after_rank(&mut self, approximate: bool) -> Result<QueryExpr, ParseError> {
1293 if self.consume(&Token::Of)? {
1294 let entity_id = self.parse_u64_slot("rank entity id")?;
1295 self.expect(Token::In)?;
1296 let ranking = self.expect_ident()?;
1297 let query = RankOfQuery { ranking, entity_id };
1298 return Ok(if approximate {
1299 QueryExpr::ApproxRankOf(query)
1300 } else {
1301 QueryExpr::RankOf(query)
1302 });
1303 }
1304
1305 if !approximate && self.consume(&Token::Range)? {
1306 let lo = self.parse_positive_u64_slot("rank range lower bound")?;
1307 self.expect(Token::To)?;
1308 let hi = self.parse_positive_u64_slot("rank range upper bound")?;
1309 if hi < lo {
1310 return Err(ParseError::value_out_of_range(
1311 "rank range upper bound",
1312 "must be greater than or equal to the lower bound",
1313 self.position(),
1314 ));
1315 }
1316 self.expect(Token::In)?;
1317 let ranking = self.expect_ident()?;
1318 return Ok(QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi }));
1319 }
1320
1321 Err(ParseError::expected(
1322 if approximate {
1323 vec!["OF"]
1324 } else {
1325 vec!["OF", "RANGE"]
1326 },
1327 self.peek(),
1328 self.position(),
1329 ))
1330 }
1331
1332 fn parse_zrank(&mut self) -> Result<QueryExpr, ParseError> {
1333 let ranking = self.expect_ident()?;
1334 let entity_id = self.parse_u64_slot("ZRANK entity id")?;
1335 Ok(QueryExpr::RankOf(RankOfQuery { ranking, entity_id }))
1336 }
1337
1338 fn parse_zrange(&mut self) -> Result<QueryExpr, ParseError> {
1339 let ranking = self.expect_ident()?;
1340 let start = self.parse_u64_slot("ZRANGE start")?;
1341 let stop = self.parse_u64_slot("ZRANGE stop")?;
1342 if stop < start {
1343 return Err(ParseError::value_out_of_range(
1344 "ZRANGE stop",
1345 "must be greater than or equal to start",
1346 self.position(),
1347 ));
1348 }
1349 let _with_scores = self.consume_ident_ci("WITHSCORES")?;
1350 Ok(QueryExpr::RankRange(RankRangeQuery {
1351 ranking,
1352 lo: start + 1,
1353 hi: stop + 1,
1354 }))
1355 }
1356
1357 fn parse_positive_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
1358 let value = self.parse_u64_slot(field)?;
1359 if value == 0 {
1360 return Err(ParseError::value_out_of_range(
1361 field,
1362 "must be a positive integer",
1363 self.position(),
1364 ));
1365 }
1366 Ok(value)
1367 }
1368
1369 fn parse_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
1370 let pos = self.position();
1371 if matches!(self.peek(), Token::Minus | Token::Dash) {
1372 return Err(ParseError::value_out_of_range(
1373 field,
1374 "must be an unsigned integer",
1375 pos,
1376 ));
1377 }
1378 let raw = self.parse_integer()?;
1379 u64::try_from(raw)
1380 .map_err(|_| ParseError::value_out_of_range(field, "must be an unsigned integer", pos))
1381 }
1382
1383 pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
1385 self.parse_sql_command().map(SqlCommand::into_statement)
1386 }
1387
1388 fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
1389 let mut path = self.expect_ident()?;
1390 while self.consume(&Token::Dot)? {
1391 let next = self.expect_ident_or_keyword()?;
1392 path = format!("{path}.{next}");
1393 }
1394 Ok(if lowercase {
1395 path.to_ascii_lowercase()
1396 } else {
1397 path
1398 })
1399 }
1400
1401 #[inline(never)]
1410 fn parse_create_command(&mut self) -> Result<SqlCommand, ParseError> {
1411 let pos = self.position();
1412 self.advance()?;
1413
1414 let mut or_replace = false;
1418 if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
1419 let _ = self.consume_ident_ci("REPLACE")?;
1420 or_replace = true;
1421 }
1422 let materialized = self.consume(&Token::Materialized)?;
1423 if self.check(&Token::View) {
1424 self.advance()?;
1425 let if_not_exists = self.match_if_not_exists()?;
1426 let name = self.expect_ident()?;
1427 let mut retention_duration_ms: Option<u64> = None;
1435 if self.check(&Token::With) {
1436 self.advance()?;
1437 if !self.consume(&Token::Retention)? && !self.consume_ident_ci("RETENTION")? {
1438 return Err(ParseError::expected(
1439 vec!["RETENTION"],
1440 self.peek(),
1441 self.position(),
1442 ));
1443 }
1444 if !materialized {
1445 return Err(ParseError::new(
1446 "WITH RETENTION is only valid on \
1447 CREATE MATERIALIZED VIEW"
1448 .to_string(),
1449 self.position(),
1450 ));
1451 }
1452 let value = self.parse_float()?;
1453 let unit_mult = self.parse_duration_unit()?;
1454 retention_duration_ms = Some((value * unit_mult).round() as u64);
1455 }
1456 if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
1459 return Err(ParseError::expected(
1460 vec!["AS"],
1461 self.peek(),
1462 self.position(),
1463 ));
1464 }
1465 let body = self.parse_sql_command()?.into_query_expr();
1468 let mut refresh_every_ms: Option<u64> = None;
1473 if self.check(&Token::Refresh) {
1474 if !materialized {
1475 return Err(ParseError::new(
1476 "REFRESH EVERY is only valid on \
1477 CREATE MATERIALIZED VIEW"
1478 .to_string(),
1479 self.position(),
1480 ));
1481 }
1482 self.advance()?;
1483 if !self.consume_ident_ci("EVERY")? {
1484 return Err(ParseError::expected(
1485 vec!["EVERY"],
1486 self.peek(),
1487 self.position(),
1488 ));
1489 }
1490 let value = self.parse_float()?;
1491 let unit_mult = self.parse_duration_unit()?;
1492 refresh_every_ms = Some((value * unit_mult).round() as u64);
1493 }
1494 return Ok(SqlCommand::CreateView(CreateViewQuery {
1495 name,
1496 query: Box::new(body),
1497 materialized,
1498 if_not_exists,
1499 or_replace,
1500 refresh_every_ms,
1501 retention_duration_ms,
1502 }));
1503 }
1504 if or_replace || materialized {
1507 return Err(ParseError::expected(
1508 vec!["VIEW"],
1509 self.peek(),
1510 self.position(),
1511 ));
1512 }
1513
1514 if self.check(&Token::Index) || self.check(&Token::Unique) {
1515 match self.parse_create_index_query()? {
1516 QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
1517 other => Err(ParseError::new(
1518 format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
1519 self.position(),
1520 )),
1521 }
1522 } else if self.check(&Token::Table) {
1523 self.expect(Token::Table)?;
1524 match self.parse_create_table_body()? {
1525 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1526 other => Err(ParseError::new(
1527 format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
1528 self.position(),
1529 )),
1530 }
1531 } else if self.check(&Token::Graph) {
1532 self.advance()?;
1533 match self.parse_create_collection_model_body(CollectionModel::Graph)? {
1534 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1535 other => Err(ParseError::new(
1536 format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
1537 self.position(),
1538 )),
1539 }
1540 } else if self.check(&Token::Document) {
1541 self.advance()?;
1542 match self.parse_create_collection_model_body(CollectionModel::Document)? {
1543 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1544 other => Err(ParseError::new(
1545 format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
1546 self.position(),
1547 )),
1548 }
1549 } else if self.check(&Token::Vector) {
1550 self.advance()?;
1551 match self.parse_create_vector_body()? {
1552 QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
1553 other => Err(ParseError::new(
1554 format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
1555 self.position(),
1556 )),
1557 }
1558 } else if self.check(&Token::Collection) {
1559 self.advance()?;
1560 match self.parse_create_collection_body()? {
1561 QueryExpr::CreateCollection(query) => Ok(SqlCommand::CreateCollection(query)),
1562 other => Err(ParseError::new(
1563 format!("internal: CREATE COLLECTION produced unexpected kind {other:?}"),
1564 self.position(),
1565 )),
1566 }
1567 } else if self.check(&Token::Kv) {
1568 self.advance()?;
1569 match self.parse_create_keyed_body(CollectionModel::Kv)? {
1570 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1571 other => Err(ParseError::new(
1572 format!("internal: CREATE KV produced unexpected kind {other:?}"),
1573 self.position(),
1574 )),
1575 }
1576 } else if self.consume_ident_ci("CONFIG")? {
1577 match self.parse_create_keyed_body(CollectionModel::Config)? {
1578 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1579 other => Err(ParseError::new(
1580 format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
1581 self.position(),
1582 )),
1583 }
1584 } else if self.consume_ident_ci("VAULT")? {
1585 match self.parse_create_keyed_body(CollectionModel::Vault)? {
1586 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1587 other => Err(ParseError::new(
1588 format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
1589 self.position(),
1590 )),
1591 }
1592 } else if self.check(&Token::Timeseries) {
1593 self.advance()?;
1594 match self.parse_create_timeseries_body()? {
1595 QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
1596 other => Err(ParseError::new(
1597 format!("internal: CREATE TIMESERIES produced unexpected kind {other:?}"),
1598 self.position(),
1599 )),
1600 }
1601 } else if self.check(&Token::Metric) {
1602 self.advance()?;
1603 match self.parse_create_metric_body()? {
1604 QueryExpr::CreateMetric(query) => Ok(SqlCommand::CreateMetric(query)),
1605 other => Err(ParseError::new(
1606 format!("internal: CREATE METRIC produced unexpected kind {other:?}"),
1607 self.position(),
1608 )),
1609 }
1610 } else if self.consume_ident_ci("METRICS")? {
1611 match self.parse_create_metrics_body()? {
1612 QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
1613 other => Err(ParseError::new(
1614 format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
1615 self.position(),
1616 )),
1617 }
1618 } else if self.consume_ident_ci("SLO")? {
1619 match self.parse_create_slo_body()? {
1620 QueryExpr::CreateSlo(query) => Ok(SqlCommand::CreateSlo(query)),
1621 other => Err(ParseError::new(
1622 format!("internal: CREATE SLO produced unexpected kind {other:?}"),
1623 self.position(),
1624 )),
1625 }
1626 } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE")) {
1627 self.advance()?;
1628 match self.parse_create_hypertable_body()? {
1629 QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
1630 other => Err(ParseError::new(
1631 format!("internal: CREATE HYPERTABLE produced unexpected kind {other:?}"),
1632 self.position(),
1633 )),
1634 }
1635 } else if self.check(&Token::Queue) {
1636 self.advance()?;
1637 match self.parse_create_queue_body()? {
1638 QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
1639 other => Err(ParseError::new(
1640 format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
1641 self.position(),
1642 )),
1643 }
1644 } else if self.check(&Token::Tree) {
1645 self.advance()?;
1646 match self.parse_create_tree_body()? {
1647 QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
1648 other => Err(ParseError::new(
1649 format!("internal: CREATE TREE produced unexpected kind {other:?}"),
1650 self.position(),
1651 )),
1652 }
1653 } else if matches!(self.peek(), Token::Ident(n) if
1654 n.eq_ignore_ascii_case("HLL") ||
1655 n.eq_ignore_ascii_case("SKETCH") ||
1656 n.eq_ignore_ascii_case("FILTER"))
1657 {
1658 match self.parse_create_probabilistic()? {
1659 QueryExpr::ProbabilisticCommand(command) => Ok(SqlCommand::Probabilistic(command)),
1660 other => Err(ParseError::new(
1661 format!("internal: CREATE probabilistic produced unexpected kind {other:?}"),
1662 self.position(),
1663 )),
1664 }
1665 } else if self.check(&Token::Schema) {
1666 self.advance()?;
1668 let if_not_exists = self.match_if_not_exists()?;
1669 let name = self.expect_ident()?;
1670 Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
1671 name,
1672 if_not_exists,
1673 }))
1674 } else if self.check(&Token::Policy) {
1675 self.advance()?;
1680 if matches!(self.peek(), Token::String(_)) {
1681 let expr = self.parse_create_iam_policy_after_keywords()?;
1683 return Ok(SqlCommand::IamPolicy(expr));
1689 }
1690 let name = self.expect_ident()?;
1691 self.expect(Token::On)?;
1692
1693 let (target_kind, table) = {
1694 use crate::storage::query::ast::PolicyTargetKind;
1695 let kw = match self.peek() {
1696 Token::Ident(s) => Some(s.to_ascii_uppercase()),
1697 _ => None,
1698 };
1699 let kind = kw.as_deref().and_then(|k| match k {
1700 "NODES" => Some(PolicyTargetKind::Nodes),
1701 "EDGES" => Some(PolicyTargetKind::Edges),
1702 "VECTORS" => Some(PolicyTargetKind::Vectors),
1703 "MESSAGES" => Some(PolicyTargetKind::Messages),
1704 "POINTS" => Some(PolicyTargetKind::Points),
1705 "DOCUMENTS" => Some(PolicyTargetKind::Documents),
1706 _ => None,
1707 });
1708 if let Some(k) = kind {
1709 self.advance()?;
1710 self.expect(Token::Of)?;
1711 let coll = self.expect_ident()?;
1712 (k, coll)
1713 } else {
1714 let coll = self.expect_ident()?;
1715 (PolicyTargetKind::Table, coll)
1716 }
1717 };
1718
1719 let action = if self.consume(&Token::For)? {
1720 let a = match self.peek() {
1721 Token::Select => {
1722 self.advance()?;
1723 Some(PolicyAction::Select)
1724 }
1725 Token::Insert => {
1726 self.advance()?;
1727 Some(PolicyAction::Insert)
1728 }
1729 Token::Update => {
1730 self.advance()?;
1731 Some(PolicyAction::Update)
1732 }
1733 Token::Delete => {
1734 self.advance()?;
1735 Some(PolicyAction::Delete)
1736 }
1737 Token::All => {
1738 self.advance()?;
1739 None
1740 }
1741 _ => None,
1742 };
1743 a
1744 } else {
1745 None
1746 };
1747
1748 let role = if self.consume(&Token::To)? {
1749 Some(self.expect_ident()?)
1750 } else {
1751 None
1752 };
1753
1754 self.expect(Token::Using)?;
1755 self.expect(Token::LParen)?;
1756 let filter = self.parse_filter()?;
1757 self.expect(Token::RParen)?;
1758
1759 Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
1760 name,
1761 table,
1762 action,
1763 role,
1764 using: Box::new(filter),
1765 target_kind,
1766 }))
1767 } else if self.check(&Token::Server) {
1768 self.advance()?;
1772 let if_not_exists = self.match_if_not_exists()?;
1773 let name = self.expect_ident()?;
1774 self.expect(Token::Foreign)?;
1775 self.expect(Token::Data)?;
1776 self.expect(Token::Wrapper)?;
1777 let wrapper = self.expect_ident()?;
1778 let options = self.parse_fdw_options_clause()?;
1779 Ok(SqlCommand::CreateServer(CreateServerQuery {
1780 name,
1781 wrapper,
1782 options,
1783 if_not_exists,
1784 }))
1785 } else if self.check(&Token::Foreign) {
1786 self.advance()?;
1790 self.expect(Token::Table)?;
1791 let if_not_exists = self.match_if_not_exists()?;
1792 let name = self.expect_ident()?;
1793 self.expect(Token::LParen)?;
1794 let mut columns = Vec::new();
1795 loop {
1796 let col_name = self.expect_ident()?;
1797 let data_type = self.expect_ident_or_keyword()?;
1798 let mut not_null = false;
1801 if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
1802 self.advance()?;
1803 if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL")) {
1804 self.advance()?;
1805 not_null = true;
1806 }
1807 }
1808 columns.push(ForeignColumnDef {
1809 name: col_name,
1810 data_type,
1811 not_null,
1812 });
1813 if !self.consume(&Token::Comma)? {
1814 break;
1815 }
1816 }
1817 self.expect(Token::RParen)?;
1818 self.expect(Token::Server)?;
1819 let server = self.expect_ident()?;
1820 let options = self.parse_fdw_options_clause()?;
1821 Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
1822 name,
1823 server,
1824 columns,
1825 options,
1826 if_not_exists,
1827 }))
1828 } else if self.check(&Token::Sequence) {
1829 self.advance()?;
1832 let if_not_exists = self.match_if_not_exists()?;
1833 let name = self.expect_ident()?;
1834 let mut start: i64 = 1;
1835 let mut increment: i64 = 1;
1836 loop {
1838 if self.consume(&Token::Start)? {
1839 let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
1841 start = self.parse_integer()?;
1842 } else if self.consume(&Token::Increment)? {
1843 let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
1845 increment = self.parse_integer()?;
1846 } else {
1847 break;
1848 }
1849 }
1850 Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
1851 name,
1852 if_not_exists,
1853 start,
1854 increment,
1855 }))
1856 } else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
1857 self.advance()?; match self.parse_create_migration_body()? {
1859 QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
1860 other => Err(ParseError::new(
1861 format!("internal: CREATE MIGRATION produced unexpected kind {other:?}"),
1862 self.position(),
1863 )),
1864 }
1865 } else if let Some(reason) = analytics_v0_non_goal_create(self.peek()) {
1866 Err(ParseError::new(reason, self.position()))
1874 } else if let Some(err) =
1875 ParseError::unsupported_recognized_token(self.peek(), self.position())
1876 {
1877 Err(err)
1878 } else {
1879 Err(ParseError::expected(
1880 vec![
1881 "TABLE",
1882 "GRAPH",
1883 "VECTOR",
1884 "DOCUMENT",
1885 "KV",
1886 "COLLECTION",
1887 "INDEX",
1888 "UNIQUE",
1889 "METRIC",
1890 "TIMESERIES",
1891 "QUEUE",
1892 "TREE",
1893 "HLL",
1894 "SKETCH",
1895 "FILTER",
1896 "SCHEMA",
1897 "SEQUENCE",
1898 "MIGRATION",
1899 ],
1900 self.peek(),
1901 pos,
1902 ))
1903 }
1904 }
1905
1906 pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
1907 match self.peek() {
1908 Token::Select => match self.parse_select_query()? {
1909 QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1910 QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1911 other => Err(ParseError::new(
1912 format!("internal: SELECT produced unexpected query kind {other:?}"),
1913 self.position(),
1914 )),
1915 },
1916 Token::From => match self.parse_from_query()? {
1917 QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
1918 QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
1919 other => Err(ParseError::new(
1920 format!("internal: FROM produced unexpected query kind {other:?}"),
1921 self.position(),
1922 )),
1923 },
1924 Token::Insert => match self.parse_insert_query()? {
1925 QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
1926 other => Err(ParseError::new(
1927 format!("internal: INSERT produced unexpected query kind {other:?}"),
1928 self.position(),
1929 )),
1930 },
1931 Token::Update => match self.parse_update_query()? {
1932 QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
1933 other => Err(ParseError::new(
1934 format!("internal: UPDATE produced unexpected query kind {other:?}"),
1935 self.position(),
1936 )),
1937 },
1938 Token::Delete => {
1939 if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
1940 {
1941 self.advance()?; self.advance()?; let key = self.parse_dotted_admin_path(true)?;
1944 Ok(SqlCommand::DeleteSecret { key })
1945 } else {
1946 match self.parse_delete_query()? {
1947 QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
1948 other => Err(ParseError::new(
1949 format!("internal: DELETE produced unexpected query kind {other:?}"),
1950 self.position(),
1951 )),
1952 }
1953 }
1954 }
1955 Token::Truncate => {
1956 self.advance()?;
1957 let model = if self.consume(&Token::Table)? {
1958 Some(CollectionModel::Table)
1959 } else if self.consume(&Token::Graph)? {
1960 Some(CollectionModel::Graph)
1961 } else if self.consume(&Token::Vector)? {
1962 Some(CollectionModel::Vector)
1963 } else if self.consume(&Token::Document)? {
1964 Some(CollectionModel::Document)
1965 } else if self.consume(&Token::Timeseries)? {
1966 Some(CollectionModel::TimeSeries)
1967 } else if self.consume_ident_ci("METRICS")? {
1968 Some(CollectionModel::Metrics)
1969 } else if self.consume(&Token::Kv)? {
1970 Some(CollectionModel::Kv)
1971 } else if self.consume(&Token::Queue)? {
1972 Some(CollectionModel::Queue)
1973 } else if self.consume(&Token::Collection)? {
1974 None
1975 } else {
1976 return Err(ParseError::expected(
1977 vec![
1978 "TABLE",
1979 "GRAPH",
1980 "VECTOR",
1981 "DOCUMENT",
1982 "TIMESERIES",
1983 "METRICS",
1984 "KV",
1985 "QUEUE",
1986 "COLLECTION",
1987 ],
1988 self.peek(),
1989 self.position(),
1990 ));
1991 };
1992 match self.parse_truncate_body(model)? {
1993 QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
1994 other => Err(ParseError::new(
1995 format!("internal: TRUNCATE produced unexpected kind {other:?}"),
1996 self.position(),
1997 )),
1998 }
1999 }
2000 Token::Explain => {
2001 if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
2004 {
2005 self.advance()?; match self.parse_explain_migration_after_keyword()? {
2007 QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
2008 other => Err(ParseError::new(
2009 format!(
2010 "internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
2011 ),
2012 self.position(),
2013 )),
2014 }
2015 } else {
2016 match self.parse_explain_alter_query()? {
2017 QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
2018 other => Err(ParseError::new(
2019 format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
2020 self.position(),
2021 )),
2022 }
2023 }
2024 }
2025 Token::Create => self.parse_create_command(),
2026 Token::Drop => {
2027 let pos = self.position();
2028 self.advance()?;
2029
2030 let materialized = self.consume(&Token::Materialized)?;
2032 if self.check(&Token::View) {
2033 self.advance()?;
2034 let if_exists = self.match_if_exists()?;
2035 let name = self.expect_ident()?;
2036 return Ok(SqlCommand::DropView(DropViewQuery {
2037 name,
2038 materialized,
2039 if_exists,
2040 }));
2041 }
2042 if materialized {
2043 return Err(ParseError::expected(
2044 vec!["VIEW"],
2045 self.peek(),
2046 self.position(),
2047 ));
2048 }
2049
2050 if self.check(&Token::Index) {
2051 match self.parse_drop_index_query()? {
2052 QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
2053 other => Err(ParseError::new(
2054 format!("internal: DROP INDEX produced unexpected kind {other:?}"),
2055 self.position(),
2056 )),
2057 }
2058 } else if self.check(&Token::Table) {
2059 self.expect(Token::Table)?;
2060 match self.parse_drop_table_body()? {
2061 QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
2062 other => Err(ParseError::new(
2063 format!("internal: DROP TABLE produced unexpected kind {other:?}"),
2064 self.position(),
2065 )),
2066 }
2067 } else if self.check(&Token::Graph) {
2068 self.advance()?;
2069 match self.parse_drop_graph_body()? {
2070 QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
2071 other => Err(ParseError::new(
2072 format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
2073 self.position(),
2074 )),
2075 }
2076 } else if self.check(&Token::Vector) {
2077 self.advance()?;
2078 match self.parse_drop_vector_body()? {
2079 QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
2080 other => Err(ParseError::new(
2081 format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
2082 self.position(),
2083 )),
2084 }
2085 } else if self.check(&Token::Document) {
2086 self.advance()?;
2087 match self.parse_drop_document_body()? {
2088 QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
2089 other => Err(ParseError::new(
2090 format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
2091 self.position(),
2092 )),
2093 }
2094 } else if self.check(&Token::Kv) {
2095 self.advance()?;
2096 match self.parse_drop_kv_body()? {
2097 QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
2098 other => Err(ParseError::new(
2099 format!("internal: DROP KV produced unexpected kind {other:?}"),
2100 self.position(),
2101 )),
2102 }
2103 } else if self.consume_ident_ci("CONFIG")? {
2104 match self.parse_drop_keyed_body(CollectionModel::Config)? {
2105 QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
2106 other => Err(ParseError::new(
2107 format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
2108 self.position(),
2109 )),
2110 }
2111 } else if self.consume_ident_ci("VAULT")? {
2112 match self.parse_drop_keyed_body(CollectionModel::Vault)? {
2113 QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
2114 other => Err(ParseError::new(
2115 format!("internal: DROP VAULT produced unexpected kind {other:?}"),
2116 self.position(),
2117 )),
2118 }
2119 } else if self.check(&Token::Collection) {
2120 self.advance()?;
2121 match self.parse_drop_collection_body()? {
2122 QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
2123 other => Err(ParseError::new(
2124 format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
2125 self.position(),
2126 )),
2127 }
2128 } else if self.check(&Token::Timeseries) {
2129 self.advance()?;
2130 match self.parse_drop_timeseries_body()? {
2131 QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
2132 other => Err(ParseError::new(
2133 format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
2134 self.position(),
2135 )),
2136 }
2137 } else if self.consume_ident_ci("METRICS")? {
2138 match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
2139 QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
2140 other => Err(ParseError::new(
2141 format!("internal: DROP METRICS produced unexpected kind {other:?}"),
2142 self.position(),
2143 )),
2144 }
2145 } else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
2146 {
2147 self.advance()?;
2151 match self.parse_drop_timeseries_body()? {
2152 QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
2153 other => Err(ParseError::new(
2154 format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
2155 self.position(),
2156 )),
2157 }
2158 } else if self.check(&Token::Queue) {
2159 self.advance()?;
2160 match self.parse_drop_queue_body()? {
2161 QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
2162 other => Err(ParseError::new(
2163 format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
2164 self.position(),
2165 )),
2166 }
2167 } else if self.check(&Token::Tree) {
2168 self.advance()?;
2169 match self.parse_drop_tree_body()? {
2170 QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
2171 other => Err(ParseError::new(
2172 format!("internal: DROP TREE produced unexpected kind {other:?}"),
2173 self.position(),
2174 )),
2175 }
2176 } else if matches!(self.peek(), Token::Ident(n) if
2177 n.eq_ignore_ascii_case("HLL") ||
2178 n.eq_ignore_ascii_case("SKETCH") ||
2179 n.eq_ignore_ascii_case("FILTER"))
2180 {
2181 match self.parse_drop_probabilistic()? {
2182 QueryExpr::ProbabilisticCommand(command) => {
2183 Ok(SqlCommand::Probabilistic(command))
2184 }
2185 other => Err(ParseError::new(
2186 format!(
2187 "internal: DROP probabilistic produced unexpected kind {other:?}"
2188 ),
2189 self.position(),
2190 )),
2191 }
2192 } else if self.check(&Token::Schema) {
2193 self.advance()?;
2195 let if_exists = self.match_if_exists()?;
2196 let name = self.expect_ident()?;
2197 let cascade = self.consume(&Token::Cascade)?;
2198 Ok(SqlCommand::DropSchema(DropSchemaQuery {
2199 name,
2200 if_exists,
2201 cascade,
2202 }))
2203 } else if self.check(&Token::Policy) {
2204 self.advance()?;
2208 if matches!(self.peek(), Token::String(_)) {
2209 let expr = self.parse_drop_iam_policy_after_keywords()?;
2210 return Ok(SqlCommand::IamPolicy(expr));
2211 }
2212 let if_exists = self.match_if_exists()?;
2213 let name = self.expect_ident()?;
2214 self.expect(Token::On)?;
2215 let table = self.expect_ident()?;
2216 Ok(SqlCommand::DropPolicy(DropPolicyQuery {
2217 name,
2218 table,
2219 if_exists,
2220 }))
2221 } else if self.check(&Token::Server) {
2222 self.advance()?;
2224 let if_exists = self.match_if_exists()?;
2225 let name = self.expect_ident()?;
2226 let cascade = self.consume(&Token::Cascade)?;
2227 Ok(SqlCommand::DropServer(DropServerQuery {
2228 name,
2229 if_exists,
2230 cascade,
2231 }))
2232 } else if self.check(&Token::Foreign) {
2233 self.advance()?;
2235 self.expect(Token::Table)?;
2236 let if_exists = self.match_if_exists()?;
2237 let name = self.expect_ident()?;
2238 Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
2239 name,
2240 if_exists,
2241 }))
2242 } else if self.check(&Token::Sequence) {
2243 self.advance()?;
2245 let if_exists = self.match_if_exists()?;
2246 let name = self.expect_ident()?;
2247 Ok(SqlCommand::DropSequence(DropSequenceQuery {
2248 name,
2249 if_exists,
2250 }))
2251 } else if let Some(err) =
2252 ParseError::unsupported_recognized_token(self.peek(), self.position())
2253 {
2254 Err(err)
2255 } else {
2256 Err(ParseError::expected(
2257 vec![
2258 "TABLE",
2259 "INDEX",
2260 "TIMESERIES",
2261 "QUEUE",
2262 "TREE",
2263 "HLL",
2264 "SKETCH",
2265 "FILTER",
2266 "SCHEMA",
2267 "SEQUENCE",
2268 ],
2269 self.peek(),
2270 pos,
2271 ))
2272 }
2273 }
2274 Token::Alter => {
2275 let next = self.peek_next()?.clone();
2280 if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
2281 self.advance()?; let stmt = self.parse_alter_user_statement()?;
2283 Ok(SqlCommand::AlterUser(stmt))
2284 } else if matches!(next, Token::Queue) {
2285 self.advance()?; self.advance()?; match self.parse_alter_queue_body()? {
2288 QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
2289 other => Err(ParseError::new(
2290 format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
2291 self.position(),
2292 )),
2293 }
2294 } else if matches!(next, Token::Metric) {
2295 self.advance()?; self.advance()?; match self.parse_alter_metric_body()? {
2298 QueryExpr::AlterMetric(query) => Ok(SqlCommand::AlterMetric(query)),
2299 other => Err(ParseError::new(
2300 format!("internal: ALTER METRIC produced unexpected kind {other:?}"),
2301 self.position(),
2302 )),
2303 }
2304 } else if matches!(next, Token::Graph) {
2305 match self.parse_alter_graph_query()? {
2309 QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2310 other => Err(ParseError::new(
2311 format!(
2312 "internal: ALTER GRAPH produced unexpected query kind {other:?}"
2313 ),
2314 self.position(),
2315 )),
2316 }
2317 } else if matches!(next, Token::Table)
2318 || matches!(next, Token::Collection)
2319 || matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("COLLECTION"))
2320 {
2321 match self.parse_alter_table_query()? {
2327 QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2328 other => Err(ParseError::new(
2329 format!(
2330 "internal: ALTER TABLE produced unexpected query kind {other:?}"
2331 ),
2332 self.position(),
2333 )),
2334 }
2335 } else if let Some(err) =
2336 ParseError::unsupported_recognized_token(&next, self.position())
2337 {
2338 Err(err)
2339 } else {
2340 match self.parse_alter_table_query()? {
2341 QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
2342 other => Err(ParseError::new(
2343 format!("internal: ALTER produced unexpected query kind {other:?}"),
2344 self.position(),
2345 )),
2346 }
2347 }
2348 }
2349 Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
2350 let stmt = self.parse_grant_statement()?;
2351 Ok(SqlCommand::Grant(stmt))
2352 }
2353 Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
2354 let stmt = self.parse_revoke_statement()?;
2355 Ok(SqlCommand::Revoke(stmt))
2356 }
2357 Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
2358 self.advance()?;
2359 if self.consume_ident_ci("BACKFILL")? {
2360 return Err(ParseError::new(
2361 "EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
2362 .to_string(),
2363 self.position(),
2364 ));
2365 }
2366 if !self.consume_ident_ci("STATUS")? {
2367 return Err(ParseError::expected(
2368 vec!["STATUS"],
2369 self.peek(),
2370 self.position(),
2371 ));
2372 }
2373
2374 let mut query = TableQuery::new("red.subscriptions");
2375 let collection = match self.peek().clone() {
2376 Token::Ident(name) => {
2377 self.advance()?;
2378 Some(name)
2379 }
2380 Token::String(name) => {
2381 self.advance()?;
2382 Some(name)
2383 }
2384 _ => None,
2385 };
2386 self.parse_table_clauses(&mut query)?;
2387 if let Some(collection) = collection {
2388 let filter = Filter::compare(
2389 FieldRef::column("red.subscriptions", "collection"),
2390 CompareOp::Eq,
2391 Value::text(collection),
2392 );
2393 let expr = filter_to_expr(&filter);
2394 query.where_expr = Some(match query.where_expr.take() {
2395 Some(existing) => Expr::binop(BinOp::And, existing, expr),
2396 None => expr,
2397 });
2398 query.filter = Some(match query.filter.take() {
2399 Some(existing) => existing.and(filter),
2400 None => filter,
2401 });
2402 }
2403 Ok(SqlCommand::Select(query))
2404 }
2405 Token::Attach => {
2406 let expr = self.parse_attach_policy()?;
2407 Ok(SqlCommand::IamPolicy(expr))
2408 }
2409 Token::Detach => {
2410 let expr = self.parse_detach_policy()?;
2411 Ok(SqlCommand::IamPolicy(expr))
2412 }
2413 Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
2414 let expr = self.parse_simulate_policy()?;
2415 Ok(SqlCommand::IamPolicy(expr))
2416 }
2417 Token::Ident(name) if name.eq_ignore_ascii_case("LINT") => {
2418 let expr = self.parse_lint_policy()?;
2419 Ok(SqlCommand::IamPolicy(expr))
2420 }
2421 Token::Ident(name) if name.eq_ignore_ascii_case("MIGRATE") => {
2422 let next = self.peek_next()?.clone();
2426 let is_policy_mode = matches!(&next, Token::Policy)
2427 || matches!(&next, Token::Ident(name)
2428 if name.eq_ignore_ascii_case("POLICY"));
2429 if is_policy_mode {
2430 let expr = self.parse_migrate_policy_mode()?;
2431 return Ok(SqlCommand::IamPolicy(expr));
2432 }
2433 Err(ParseError::expected(
2434 vec!["POLICY"],
2435 self.peek(),
2436 self.position(),
2437 ))
2438 }
2439 Token::Set => {
2440 self.advance()?;
2441 if self.consume_ident_ci("CONFIG")? {
2442 let full_key = self.parse_dotted_admin_path(true)?;
2443 self.expect(Token::Eq)?;
2444 let value = self.parse_literal_value()?;
2445 Ok(SqlCommand::SetConfig {
2446 key: full_key,
2447 value,
2448 })
2449 } else if self.consume_ident_ci("SECRET")? {
2450 let key = self.parse_dotted_admin_path(true)?;
2451 self.expect(Token::Eq)?;
2452 let value = self.parse_literal_value()?;
2453 Ok(SqlCommand::SetSecret { key, value })
2454 } else if self.consume_ident_ci("TENANT")? {
2455 let _ = self.consume(&Token::Eq)?;
2458 if self.consume_ident_ci("NULL")? {
2459 Ok(SqlCommand::SetTenant(None))
2460 } else {
2461 let value = self.parse_literal_value()?;
2462 match value {
2463 Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
2464 Value::Null => Ok(SqlCommand::SetTenant(None)),
2465 other => Err(ParseError::new(
2466 format!("SET TENANT expects a text literal or NULL, got {other:?}"),
2467 self.position(),
2468 )),
2469 }
2470 }
2471 } else {
2472 Err(ParseError::expected(
2473 vec!["CONFIG", "SECRET", "TENANT"],
2474 self.peek(),
2475 self.position(),
2476 ))
2477 }
2478 }
2479 Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
2480 self.advance()?;
2481 match self.parse_apply_migration()? {
2482 QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
2483 other => Err(ParseError::new(
2484 format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
2485 self.position(),
2486 )),
2487 }
2488 }
2489 Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
2490 self.advance()?;
2492 if self.consume_ident_ci("TENANT")? {
2493 Ok(SqlCommand::SetTenant(None))
2494 } else {
2495 Err(ParseError::expected(
2496 vec!["TENANT"],
2497 self.peek(),
2498 self.position(),
2499 ))
2500 }
2501 }
2502 Token::Ident(name)
2503 if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
2504 {
2505 self.advance()?;
2506 let collection = self.parse_dotted_admin_path(false)?;
2507 let mut query = TableQuery::new("red.describe");
2508 query.filter = Some(Filter::compare(
2509 FieldRef::column("", "collection"),
2510 CompareOp::Eq,
2511 Value::text(collection),
2512 ));
2513 Ok(SqlCommand::Select(query))
2514 }
2515 Token::Desc => {
2516 self.advance()?;
2517 let collection = self.parse_dotted_admin_path(false)?;
2518 let mut query = TableQuery::new("red.describe");
2519 query.filter = Some(Filter::compare(
2520 FieldRef::column("", "collection"),
2521 CompareOp::Eq,
2522 Value::text(collection),
2523 ));
2524 Ok(SqlCommand::Select(query))
2525 }
2526 Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
2527 self.advance()?;
2528 if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
2529 if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
2530 return Err(ParseError::expected(
2531 vec!["TABLE"],
2532 self.peek(),
2533 self.position(),
2534 ));
2535 }
2536 let collection = self.parse_dotted_admin_path(false)?;
2537 let mut query = TableQuery::new("red.show_create");
2538 query.filter = Some(Filter::compare(
2539 FieldRef::column("", "collection"),
2540 CompareOp::Eq,
2541 Value::text(collection),
2542 ));
2543 Ok(SqlCommand::Select(query))
2544 } else if self.consume_ident_ci("CONFIG")? {
2545 let prefix = if !self.check(&Token::Eof) {
2549 let first = self.expect_ident()?;
2550 let mut full = first;
2551 while self.consume(&Token::Dot)? {
2552 let next = self.expect_ident_or_keyword()?;
2553 full = format!("{full}.{next}");
2554 }
2555 Some(full.to_ascii_lowercase())
2558 } else {
2559 None
2560 };
2561 Ok(SqlCommand::ShowConfig { prefix })
2562 } else if self.consume_ident_ci("COLLECTIONS")? {
2563 let mut query = TableQuery::new("red.collections");
2564 let include_internal = if self.consume_ident_ci("INCLUDING")? {
2565 if !self.consume_ident_ci("INTERNAL")? {
2566 return Err(ParseError::expected(
2567 vec!["INTERNAL"],
2568 self.peek(),
2569 self.position(),
2570 ));
2571 }
2572 true
2573 } else {
2574 false
2575 };
2576 self.parse_table_clauses(&mut query)?;
2577 if !include_internal {
2578 let user_filter = query.filter.take();
2579 let hide_internal = crate::storage::query::ast::Filter::Compare {
2580 field: FieldRef::column("", "internal"),
2581 op: CompareOp::Eq,
2582 value: Value::Boolean(false),
2583 };
2584 query.filter = Some(match user_filter {
2585 Some(filter) => filter.and(hide_internal),
2586 None => hide_internal,
2587 });
2588 }
2589 Ok(SqlCommand::Select(query))
2590 } else if self.consume_ident_ci("TABLES")? {
2591 Ok(SqlCommand::Select(parse_show_collections_by_model(
2592 self, "table",
2593 )?))
2594 } else if self.consume_ident_ci("QUEUES")? {
2595 let mut query = TableQuery::new("red.queues");
2603 let include_internal = if self.consume_ident_ci("INCLUDING")? {
2604 if !self.consume_ident_ci("INTERNAL")? {
2605 return Err(ParseError::expected(
2606 vec!["INTERNAL"],
2607 self.peek(),
2608 self.position(),
2609 ));
2610 }
2611 true
2612 } else {
2613 false
2614 };
2615 self.parse_table_clauses(&mut query)?;
2616 if !include_internal {
2617 let hide_internal = Filter::Compare {
2618 field: FieldRef::column("", "internal"),
2619 op: CompareOp::Eq,
2620 value: Value::Boolean(false),
2621 };
2622 add_table_filter(&mut query, hide_internal);
2623 }
2624 Ok(SqlCommand::Select(query))
2625 } else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
2626 Ok(SqlCommand::Select(parse_show_collections_by_model(
2627 self, "vector",
2628 )?))
2629 } else if self.consume_ident_ci("DOCUMENTS")? {
2630 Ok(SqlCommand::Select(parse_show_collections_by_model(
2631 self, "document",
2632 )?))
2633 } else if self.consume(&Token::Timeseries)?
2634 || self.consume_ident_ci("TIMESERIES")?
2635 {
2636 Ok(SqlCommand::Select(parse_show_collections_by_model(
2637 self,
2638 "timeseries",
2639 )?))
2640 } else if self.consume_ident_ci("GRAPHS")? {
2641 Ok(SqlCommand::Select(parse_show_collections_by_model(
2642 self, "graph",
2643 )?))
2644 } else if self.consume_ident_ci("CONFIGS")? {
2645 Ok(SqlCommand::Select(parse_show_collections_by_model(
2646 self, "config",
2647 )?))
2648 } else if self.consume_ident_ci("VAULTS")? {
2649 Ok(SqlCommand::Select(parse_show_collections_by_model(
2650 self, "vault",
2651 )?))
2652 } else if self.consume(&Token::Kv)?
2653 || self.consume_ident_ci("KV")?
2654 || self.consume_ident_ci("KVS")?
2655 {
2656 Ok(SqlCommand::Select(parse_show_collections_by_model(
2657 self, "kv",
2658 )?))
2659 } else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
2660 let collection = self.parse_dotted_admin_path(false)?;
2661 let mut query = TableQuery::new("red.columns");
2662 query.filter = Some(Filter::compare(
2663 FieldRef::column("", "collection"),
2664 CompareOp::Eq,
2665 Value::text(collection),
2666 ));
2667 Ok(SqlCommand::Select(query))
2668 } else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
2669 let mut query = TableQuery::new("red.show_indexes");
2670 if self.consume(&Token::On)? {
2671 let collection = self.expect_ident_or_keyword()?;
2672 let filter = Filter::Compare {
2673 field: FieldRef::column("", "table"),
2674 op: CompareOp::Eq,
2675 value: Value::text(collection),
2676 };
2677 query.where_expr = Some(filter_to_expr(&filter));
2678 query.filter = Some(filter);
2679 }
2680 self.parse_table_clauses(&mut query)?;
2681 Ok(SqlCommand::Select(query))
2682 } else if self.consume_ident_ci("POLICIES")? {
2683 if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
2684 let principal = self.parse_iam_principal_kind()?;
2685 return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
2686 filter: Some(principal),
2687 }));
2688 }
2689 let mut query = TableQuery::new("red.policies");
2690 let collection_filter =
2691 if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
2692 let collection = self.parse_dotted_admin_path(false)?;
2693 Some(Filter::Compare {
2694 field: FieldRef::TableColumn {
2695 table: String::new(),
2696 column: "collection".to_string(),
2697 },
2698 op: CompareOp::Eq,
2699 value: Value::text(collection),
2700 })
2701 } else {
2702 None
2703 };
2704 self.parse_table_clauses(&mut query)?;
2705 if let Some(collection_filter) = collection_filter {
2706 let combined = match query.filter.take() {
2707 Some(existing) => {
2708 Filter::And(Box::new(collection_filter), Box::new(existing))
2709 }
2710 None => collection_filter,
2711 };
2712 query.where_expr = Some(filter_to_expr(&combined));
2713 query.filter = Some(combined);
2714 }
2715 Ok(SqlCommand::Select(query))
2716 } else if self.consume_ident_ci("STATS")? {
2717 let mut query = TableQuery::new("red.stats");
2718 let collection = match self.peek().clone() {
2719 Token::Ident(name) => {
2720 self.advance()?;
2721 Some(name)
2722 }
2723 Token::String(name) => {
2724 self.advance()?;
2725 Some(name)
2726 }
2727 _ => None,
2728 };
2729 self.parse_table_clauses(&mut query)?;
2730 if let Some(collection) = collection {
2731 let filter = Filter::compare(
2732 FieldRef::column("red.stats", "collection"),
2733 CompareOp::Eq,
2734 Value::text(collection),
2735 );
2736 let expr = filter_to_expr(&filter);
2737 query.where_expr = Some(match query.where_expr.take() {
2738 Some(existing) => Expr::binop(BinOp::And, existing, expr),
2739 None => expr,
2740 });
2741 query.filter = Some(match query.filter.take() {
2742 Some(existing) => existing.and(filter),
2743 None => filter,
2744 });
2745 }
2746 Ok(SqlCommand::Select(query))
2747 } else if self.consume_ident_ci("SAMPLE")? {
2748 let mut query = TableQuery::new(&self.expect_ident()?);
2749 query.limit = if self.consume(&Token::Limit)? {
2750 Some(self.parse_integer()? as u64)
2751 } else {
2752 Some(10)
2753 };
2754 Ok(SqlCommand::Select(query))
2755 } else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
2756 let prefix = if !self.check(&Token::Eof) {
2757 Some(self.parse_dotted_admin_path(true)?)
2758 } else {
2759 None
2760 };
2761 Ok(SqlCommand::ShowSecrets { prefix })
2762 } else if self.consume_ident_ci("TENANT")? {
2763 Ok(SqlCommand::ShowTenant)
2764 } else if let Some(expr) = self.parse_show_iam_after_show()? {
2765 Ok(SqlCommand::IamPolicy(expr))
2766 } else {
2767 Err(ParseError::expected(
2768 vec![
2769 "CONFIG",
2770 "SECRET",
2771 "SECRETS",
2772 "COLLECTIONS",
2773 "TABLES",
2774 "QUEUES",
2775 "VECTORS",
2776 "DOCUMENTS",
2777 "TIMESERIES",
2778 "GRAPHS",
2779 "KV",
2780 "SCHEMA",
2781 "INDICES",
2782 "INDEXES",
2783 "SAMPLE",
2784 "POLICIES",
2785 "STATS",
2786 "TENANT",
2787 "EFFECTIVE",
2788 ],
2789 self.peek(),
2790 self.position(),
2791 ))
2792 }
2793 }
2794 Token::Begin | Token::Start => {
2806 self.advance()?;
2807 let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2808 if self.consume_ident_ci("ISOLATION")? {
2810 self.expect(Token::Level)?;
2811 let mut parts: Vec<String> = Vec::new();
2815 if self.consume_ident_ci("READ")? {
2816 parts.push("READ".to_string());
2817 if self.consume_ident_ci("UNCOMMITTED")? {
2818 parts.push("UNCOMMITTED".to_string());
2819 } else if self.consume_ident_ci("COMMITTED")? {
2820 parts.push("COMMITTED".to_string());
2821 } else {
2822 return Err(ParseError::expected(
2823 vec!["UNCOMMITTED", "COMMITTED"],
2824 self.peek(),
2825 self.position(),
2826 ));
2827 }
2828 } else if self.consume_ident_ci("REPEATABLE")? {
2829 parts.push("REPEATABLE".to_string());
2830 if !self.consume_ident_ci("READ")? {
2831 return Err(ParseError::expected(
2832 vec!["READ"],
2833 self.peek(),
2834 self.position(),
2835 ));
2836 }
2837 parts.push("READ".to_string());
2838 } else if self.consume_ident_ci("SNAPSHOT")? {
2839 parts.push("SNAPSHOT".to_string());
2840 } else if self.consume_ident_ci("SERIALIZABLE")? {
2841 return Err(ParseError::new(
2842 "ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
2843 currently provides SNAPSHOT ISOLATION (which PG calls \
2844 REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
2845 READ COMMITTED, or omit ISOLATION LEVEL for the default."
2846 .to_string(),
2847 self.position(),
2848 ));
2849 } else {
2850 return Err(ParseError::expected(
2851 vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
2852 self.peek(),
2853 self.position(),
2854 ));
2855 }
2856 let _ = parts;
2858 }
2859 Ok(SqlCommand::TransactionControl(TxnControl::Begin))
2860 }
2861 Token::Commit => {
2863 self.advance()?;
2864 let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2865 Ok(SqlCommand::TransactionControl(TxnControl::Commit))
2866 }
2867 Token::Rollback => {
2870 self.advance()?;
2871 if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
2872 match self.parse_rollback_migration_after_keyword()? {
2873 QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
2874 other => Err(ParseError::new(
2875 format!(
2876 "internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
2877 ),
2878 self.position(),
2879 )),
2880 }
2881 } else {
2882 let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
2883 if self.consume(&Token::To)? {
2884 let _ = self.consume(&Token::Savepoint)?;
2885 let name = self.expect_ident()?;
2886 Ok(SqlCommand::TransactionControl(
2887 TxnControl::RollbackToSavepoint(name),
2888 ))
2889 } else {
2890 Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
2891 }
2892 }
2893 }
2894 Token::Savepoint => {
2896 self.advance()?;
2897 let name = self.expect_ident()?;
2898 Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
2899 }
2900 Token::Release => {
2902 self.advance()?;
2903 let _ = self.consume(&Token::Savepoint)?;
2904 let name = self.expect_ident()?;
2905 Ok(SqlCommand::TransactionControl(
2906 TxnControl::ReleaseSavepoint(name),
2907 ))
2908 }
2909 Token::Vacuum => {
2911 self.advance()?;
2912 let full = self.consume(&Token::Full)?;
2913 let target = if self.check(&Token::Eof) {
2914 None
2915 } else {
2916 Some(self.expect_ident()?)
2917 };
2918 Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
2919 target,
2920 full,
2921 }))
2922 }
2923 Token::Refresh => {
2925 self.advance()?;
2926 self.expect(Token::Materialized)?;
2927 self.expect(Token::View)?;
2928 let name = self.expect_ident()?;
2929 Ok(SqlCommand::RefreshMaterializedView(
2930 RefreshMaterializedViewQuery { name },
2931 ))
2932 }
2933 Token::Analyze => {
2935 self.advance()?;
2936 let target = if self.check(&Token::Eof) {
2937 None
2938 } else {
2939 Some(self.expect_ident()?)
2940 };
2941 Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
2942 target,
2943 }))
2944 }
2945 Token::Copy => {
2951 self.advance()?;
2952 let table = self.expect_ident()?;
2953 self.expect(Token::From)?;
2954 let path = self.parse_string()?;
2955
2956 let mut delimiter: Option<char> = None;
2957 let mut has_header = false;
2958 let format = CopyFormat::Csv;
2959
2960 if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
2964 self.expect(Token::LParen)?;
2965 loop {
2966 if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
2967 let _ = self.consume(&Token::Eq)?;
2968 let _ = self.expect_ident()?;
2970 } else if self.consume(&Token::Header)? {
2971 let _ = self.consume(&Token::Eq)?;
2972 has_header = match self.peek().clone() {
2975 Token::True => {
2976 self.advance()?;
2977 true
2978 }
2979 Token::False => {
2980 self.advance()?;
2981 false
2982 }
2983 Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
2984 self.advance()?;
2985 true
2986 }
2987 Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
2988 self.advance()?;
2989 false
2990 }
2991 _ => true,
2992 };
2993 } else if self.consume(&Token::Delimiter)? {
2994 let _ = self.consume(&Token::Eq)?;
2995 let s = self.parse_string()?;
2996 delimiter = s.chars().next();
2997 } else {
2998 break;
2999 }
3000 if !self.consume(&Token::Comma)? {
3001 break;
3002 }
3003 }
3004 self.expect(Token::RParen)?;
3005 }
3006
3007 loop {
3009 if self.consume(&Token::Delimiter)? {
3010 let s = self.parse_string()?;
3011 delimiter = s.chars().next();
3012 } else if self.consume(&Token::Header)? {
3013 has_header = true;
3014 } else {
3015 break;
3016 }
3017 }
3018
3019 Ok(SqlCommand::CopyFrom(CopyFromQuery {
3020 table,
3021 path,
3022 format,
3023 delimiter,
3024 has_header,
3025 }))
3026 }
3027 other => Err(ParseError::expected(
3028 vec![
3029 "SELECT",
3030 "FROM",
3031 "INSERT",
3032 "UPDATE",
3033 "DELETE",
3034 "EXPLAIN",
3035 "CREATE",
3036 "DROP",
3037 "ALTER",
3038 "SET",
3039 "SHOW",
3040 "BEGIN",
3041 "COMMIT",
3042 "ROLLBACK",
3043 "SAVEPOINT",
3044 "RELEASE",
3045 "START",
3046 "VACUUM",
3047 "ANALYZE",
3048 "COPY",
3049 "REFRESH",
3050 "DESCRIBE",
3051 "DESC",
3052 ],
3053 other,
3054 self.position(),
3055 )),
3056 }
3057 }
3058}