use crate::ast::{
AlterMetricQuery, AlterQueueQuery, AlterTableQuery, AlterUserStmt, ApplyMigrationQuery,
AskQuery, BinOp, CompareOp, ConfigCommand, CopyFormat, CopyFromQuery, CreateCollectionQuery,
CreateForeignTableQuery, CreateIndexQuery, CreateMetricQuery, CreateMigrationQuery,
CreatePolicyQuery, CreateQueueQuery, CreateSchemaQuery, CreateSequenceQuery, CreateServerQuery,
CreateSloQuery, CreateTableQuery, CreateTimeSeriesQuery, CreateTreeQuery, CreateUserStmt,
CreateVectorQuery, CreateViewQuery, DeleteQuery, DropCollectionQuery, DropDocumentQuery,
DropForeignTableQuery, DropGraphQuery, DropIndexQuery, DropKvQuery, DropPolicyQuery,
DropQueueQuery, DropSchemaQuery, DropSequenceQuery, DropServerQuery, DropTableQuery,
DropTimeSeriesQuery, DropTreeQuery, DropVectorQuery, DropViewQuery, EventsBackfillQuery,
ExplainAlterQuery, ExplainMigrationQuery, Expr, FieldRef, Filter, ForeignColumnDef, GrantStmt,
GraphCommand, GraphQuery, HybridQuery, InsertQuery, JoinQuery, KvCommand, MaintenanceCommand,
PathQuery, PolicyAction, ProbabilisticCommand, QueryExpr, QueueCommand, QueueSelectQuery,
RankOfQuery, RankRangeQuery, RefreshMaterializedViewQuery, RevokeStmt, RollbackMigrationQuery,
SearchCommand, Span, TableQuery, TreeCommand, TruncateQuery, TxnControl, UpdateQuery,
VectorQuery,
};
use crate::lexer::Token;
use crate::parser::{ParseError, Parser, SafeTokenDisplay};
use crate::sql_lowering::filter_to_expr;
use reddb_types::catalog::CollectionModel;
use reddb_types::types::Value;
#[derive(Debug, Clone)]
pub enum SqlStatement {
Query(SqlQuery),
Mutation(SqlMutation),
Schema(SqlSchemaCommand),
Admin(SqlAdminCommand),
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum FrontendStatement {
Sql(SqlStatement),
Graph(GraphQuery),
GraphCommand(GraphCommand),
Path(PathQuery),
Vector(VectorQuery),
Hybrid(HybridQuery),
Search(SearchCommand),
Ask(AskQuery),
QueueSelect(QueueSelectQuery),
QueueCommand(QueueCommand),
EventsBackfill(EventsBackfillQuery),
EventsBackfillStatus { collection: String },
TreeCommand(TreeCommand),
ProbabilisticCommand(ProbabilisticCommand),
KvCommand(KvCommand),
ConfigCommand(ConfigCommand),
Ranking(QueryExpr),
}
#[derive(Debug, Clone)]
pub enum SqlCommand {
Select(TableQuery),
Join(JoinQuery),
Insert(InsertQuery),
Update(UpdateQuery),
Delete(DeleteQuery),
ExplainAlter(ExplainAlterQuery),
CreateTable(CreateTableQuery),
CreateCollection(CreateCollectionQuery),
CreateVector(CreateVectorQuery),
DropTable(DropTableQuery),
DropGraph(DropGraphQuery),
DropVector(DropVectorQuery),
DropDocument(DropDocumentQuery),
DropKv(DropKvQuery),
DropCollection(DropCollectionQuery),
Truncate(TruncateQuery),
AlterTable(AlterTableQuery),
CreateIndex(CreateIndexQuery),
DropIndex(DropIndexQuery),
CreateTimeSeries(CreateTimeSeriesQuery),
CreateMetric(CreateMetricQuery),
AlterMetric(AlterMetricQuery),
CreateSlo(CreateSloQuery),
DropTimeSeries(DropTimeSeriesQuery),
CreateQueue(CreateQueueQuery),
AlterQueue(AlterQueueQuery),
DropQueue(DropQueueQuery),
CreateTree(CreateTreeQuery),
DropTree(DropTreeQuery),
Probabilistic(ProbabilisticCommand),
SetConfig {
key: String,
value: Value,
},
ShowConfig {
prefix: Option<String>,
as_json: bool,
},
SetSecret {
key: String,
value: Value,
},
DeleteSecret {
key: String,
},
ShowSecrets {
prefix: Option<String>,
},
SetTenant(Option<String>),
ShowTenant,
TransactionControl(TxnControl),
Maintenance(MaintenanceCommand),
CreateSchema(CreateSchemaQuery),
DropSchema(DropSchemaQuery),
CreateSequence(CreateSequenceQuery),
DropSequence(DropSequenceQuery),
CopyFrom(CopyFromQuery),
CreateView(CreateViewQuery),
DropView(DropViewQuery),
RefreshMaterializedView(RefreshMaterializedViewQuery),
CreatePolicy(CreatePolicyQuery),
DropPolicy(DropPolicyQuery),
CreateServer(CreateServerQuery),
DropServer(DropServerQuery),
CreateForeignTable(CreateForeignTableQuery),
DropForeignTable(DropForeignTableQuery),
Grant(GrantStmt),
Revoke(RevokeStmt),
AlterUser(AlterUserStmt),
CreateUser(CreateUserStmt),
IamPolicy(QueryExpr),
CreateMigration(CreateMigrationQuery),
ApplyMigration(ApplyMigrationQuery),
RollbackMigration(RollbackMigrationQuery),
ExplainMigration(ExplainMigrationQuery),
}
fn analytics_v0_non_goal_create(token: &Token) -> Option<String> {
let ident = match token {
Token::Ident(s) => s,
_ => return None,
};
let upper = ident.to_ascii_uppercase();
let message = match upper.as_str() {
"ANALYTICS" => {
"CREATE ANALYTICS is not supported in Analytics v0 — \
use CREATE METRIC <dotted.path> for the metric-centric \
catalog (PRD #782 non-goal)"
}
"EVENT" => {
"CREATE EVENT is not supported in Analytics v0 — \
event-shaped data lives in ordinary TABLE/DOCUMENT \
collections, not a new storage model (PRD #782 non-goal)"
}
"COHORT" => {
"CREATE COHORT is not supported in Analytics v0 — \
cohort surfaces are deferred (PRD #782 non-goal)"
}
"FUNNEL" => {
"CREATE FUNNEL is not supported in Analytics v0 — \
funnel surfaces are deferred (PRD #782 non-goal)"
}
"SLA" => {
"CREATE SLA is not supported in Analytics v0 — \
SLA/legal/commercial contract modeling is post-MVP \
(PRD #782 non-goal)"
}
"ADAPTER" => {
"CREATE ADAPTER is not supported in Analytics v0 — \
Prometheus/Grafana/Snowplow/Google Analytics adapters \
are deferred (PRD #782 non-goal)"
}
_ => return None,
};
Some(message.to_string())
}
fn collection_model_filter(model: &str) -> Filter {
Filter::Compare {
field: FieldRef::column("", "model"),
op: CompareOp::Eq,
value: Value::Text(model.to_string().into()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use reddb_types::catalog::CollectionModel;
fn frontend(input: &str) -> FrontendStatement {
parse_frontend(input)
.unwrap_or_else(|err| panic!("failed to parse frontend {input:?}: {err:?}"))
}
fn expr(input: &str) -> QueryExpr {
frontend(input).into_query_expr()
}
fn sql_command(input: &str) -> SqlCommand {
sql_command_result(input)
.unwrap_or_else(|err| panic!("failed to parse SQL command {input:?}: {err:?}"))
}
fn sql_command_result(input: &str) -> Result<SqlCommand, ParseError> {
let mut parser = Parser::new(input)?;
parser.parse_sql_command()
}
fn assert_text(value: &Value, expected: &str) {
match value {
Value::Text(text) => assert_eq!(text.as_ref(), expected),
other => panic!("expected text {expected:?}, got {other:?}"),
}
}
#[test]
fn parse_frontend_routes_core_sql_statements() {
let FrontendStatement::Sql(SqlStatement::Query(SqlQuery::Select(query))) =
frontend("SELECT * FROM users")
else {
panic!("SELECT should route to SqlStatement::Query::Select");
};
assert_eq!(query.table, "users");
let QueryExpr::Insert(query) = expr("INSERT INTO users (id, name) VALUES (1, 'ada')")
else {
panic!("INSERT should lower through the SQL frontend");
};
assert_eq!(query.table, "users");
assert_eq!(query.columns, vec!["id", "name"]);
assert_eq!(query.values.len(), 1);
let QueryExpr::Update(query) = expr("UPDATE users SET name = 'ada' WHERE id = 1") else {
panic!("UPDATE should lower through the SQL frontend");
};
assert_eq!(query.table, "users");
assert_eq!(query.assignments[0].0, "name");
let QueryExpr::Delete(query) = expr("DELETE FROM users WHERE id = 1") else {
panic!("DELETE should lower through the SQL frontend");
};
assert_eq!(query.table, "users");
assert!(query.filter.is_some());
let QueryExpr::CreateTable(query) = expr("CREATE TABLE users (id INT, name TEXT)") else {
panic!("CREATE TABLE should lower through the SQL frontend");
};
assert_eq!(query.collection_model, CollectionModel::Table);
assert_eq!(query.name, "users");
assert_eq!(query.columns[0].name, "id");
let QueryExpr::DropTable(query) = expr("DROP TABLE IF EXISTS users") else {
panic!("DROP TABLE should lower through the SQL frontend");
};
assert_eq!(query.name, "users");
assert!(query.if_exists);
}
#[test]
fn parse_frontend_routes_admin_and_catalog_sql() {
let QueryExpr::Table(query) = expr("SHOW COLLECTIONS") else {
panic!("SHOW COLLECTIONS should become a red.collections table query");
};
assert_eq!(query.table, "red.collections");
assert!(query.filter.is_some());
let QueryExpr::Table(query) = expr("SHOW TABLES LIMIT 5") else {
panic!("SHOW TABLES should become a filtered red.collections table query");
};
assert_eq!(query.table, "red.collections");
assert_eq!(query.limit, Some(5));
assert!(query.filter.is_some());
assert!(matches!(
expr("SHOW CONFIG durability.mode"),
QueryExpr::ShowConfig { prefix: Some(prefix), as_json: false } if prefix == "durability.mode"
));
assert!(matches!(
expr("SHOW CONFIG"),
QueryExpr::ShowConfig {
prefix: None,
as_json: false
}
));
assert!(matches!(
expr("SHOW CONFIG runtime.result_cache AS JSON"),
QueryExpr::ShowConfig { prefix: Some(prefix), as_json: true } if prefix == "runtime.result_cache"
));
assert!(matches!(
expr("SHOW CONFIG FORMAT JSON"),
QueryExpr::ShowConfig {
prefix: None,
as_json: true
}
));
let QueryExpr::SetConfig { key, value } = expr("SET CONFIG durability.mode = 'sync'")
else {
panic!("SET CONFIG should stay on the SQL admin surface");
};
assert_eq!(key, "durability.mode");
assert_text(&value, "sync");
let QueryExpr::SetSecret { key, value } = expr("SET SECRET provider.api_key = 'sk_test'")
else {
panic!("SET SECRET should stay on the SQL admin surface");
};
assert_eq!(key, "provider.api_key");
assert_text(&value, "sk_test");
assert!(matches!(
expr("SET SECRET red.secrets.provider.api_key = 'sk_test'"),
QueryExpr::SetSecret { key, .. } if key == "red.secret.provider.api_key"
));
assert!(matches!(
expr("DELETE SECRET provider.api_key"),
QueryExpr::DeleteSecret { key } if key == "provider.api_key"
));
assert!(matches!(
expr("DELETE SECRET red.secrets.provider.api_key"),
QueryExpr::DeleteSecret { key } if key == "red.secret.provider.api_key"
));
assert!(matches!(
expr("SHOW SECRETS provider"),
QueryExpr::ShowSecrets { prefix: Some(prefix) } if prefix == "provider"
));
assert!(matches!(
expr("SHOW SECRETS red.secrets.provider"),
QueryExpr::ShowSecrets { prefix: Some(prefix) } if prefix == "red.secret.provider"
));
assert!(matches!(
expr("SET TENANT 'acme'"),
QueryExpr::SetTenant(Some(tenant)) if tenant == "acme"
));
assert!(matches!(expr("RESET TENANT"), QueryExpr::SetTenant(None)));
assert!(matches!(expr("SHOW TENANT"), QueryExpr::ShowTenant));
assert!(matches!(
expr("BEGIN ISOLATION LEVEL SNAPSHOT"),
QueryExpr::TransactionControl(TxnControl::Begin)
));
assert!(matches!(
expr("ROLLBACK TO SAVEPOINT sp1"),
QueryExpr::TransactionControl(TxnControl::RollbackToSavepoint(name)) if name == "sp1"
));
assert!(matches!(
expr("VACUUM FULL users"),
QueryExpr::MaintenanceCommand(MaintenanceCommand::Vacuum {
target: Some(target),
full: true,
}) if target == "users"
));
}
#[test]
fn parse_frontend_routes_extended_schema_sql() {
assert!(matches!(
expr("CREATE SCHEMA IF NOT EXISTS app"),
QueryExpr::CreateSchema(CreateSchemaQuery {
name,
if_not_exists: true,
}) if name == "app"
));
assert!(matches!(
expr("DROP SCHEMA IF EXISTS app CASCADE"),
QueryExpr::DropSchema(DropSchemaQuery {
name,
if_exists: true,
cascade: true,
}) if name == "app"
));
assert!(matches!(
expr("CREATE SEQUENCE IF NOT EXISTS seq START WITH 10 INCREMENT BY 2"),
QueryExpr::CreateSequence(CreateSequenceQuery {
name,
if_not_exists: true,
start: 10,
increment: 2,
}) if name == "seq"
));
assert!(matches!(
expr("DROP SEQUENCE IF EXISTS seq"),
QueryExpr::DropSequence(DropSequenceQuery {
name,
if_exists: true,
}) if name == "seq"
));
let QueryExpr::CopyFrom(copy) = expr(
"COPY users FROM '/tmp/u.csv' WITH (FORMAT = csv, HEADER = true, DELIMITER = ';')",
) else {
panic!("COPY should lower through SQL frontend");
};
assert_eq!(copy.table, "users");
assert_eq!(copy.path, "/tmp/u.csv");
assert_eq!(copy.format, CopyFormat::Csv);
assert_eq!(copy.delimiter, Some(';'));
assert!(copy.has_header);
let QueryExpr::CreateView(view) = expr(
"CREATE MATERIALIZED VIEW IF NOT EXISTS mv WITH RETENTION 1 h \
AS SELECT id FROM users REFRESH EVERY 5 s",
) else {
panic!("CREATE MATERIALIZED VIEW should lower through SQL frontend");
};
assert_eq!(view.name, "mv");
assert!(view.materialized);
assert!(view.if_not_exists);
assert_eq!(view.retention_duration_ms, Some(3_600_000));
assert_eq!(view.refresh_every_ms, Some(5_000));
assert!(matches!(*view.query, QueryExpr::Table(_)));
assert!(matches!(
expr("DROP MATERIALIZED VIEW IF EXISTS mv"),
QueryExpr::DropView(DropViewQuery {
name,
materialized: true,
if_exists: true,
}) if name == "mv"
));
assert!(matches!(
expr("REFRESH MATERIALIZED VIEW mv"),
QueryExpr::RefreshMaterializedView(RefreshMaterializedViewQuery { name }) if name == "mv"
));
}
#[test]
fn parse_frontend_routes_fdw_policy_auth_and_migrations() {
let QueryExpr::CreateServer(server) = expr(
"CREATE SERVER IF NOT EXISTS csvsrv FOREIGN DATA WRAPPER csv OPTIONS (path '/data')",
) else {
panic!("CREATE SERVER should lower through SQL frontend");
};
assert_eq!(server.name, "csvsrv");
assert_eq!(server.wrapper, "csv");
assert!(server.if_not_exists);
assert_eq!(
server.options,
vec![("path".to_string(), "/data".to_string())]
);
let QueryExpr::CreateForeignTable(table) = expr(
"CREATE FOREIGN TABLE IF NOT EXISTS ext_users \
(id INT, name TEXT) SERVER csvsrv OPTIONS (file 'users.csv')",
) else {
panic!("CREATE FOREIGN TABLE should lower through SQL frontend");
};
assert_eq!(table.name, "ext_users");
assert_eq!(table.server, "csvsrv");
assert!(table.if_not_exists);
assert_eq!(table.columns.len(), 2);
assert!(!table.columns[0].not_null);
assert!(matches!(
expr("DROP SERVER IF EXISTS csvsrv CASCADE"),
QueryExpr::DropServer(DropServerQuery {
name,
if_exists: true,
cascade: true,
}) if name == "csvsrv"
));
assert!(matches!(
expr("DROP FOREIGN TABLE IF EXISTS ext_users"),
QueryExpr::DropForeignTable(DropForeignTableQuery {
name,
if_exists: true,
}) if name == "ext_users"
));
let QueryExpr::CreatePolicy(policy) = expr(
"CREATE POLICY readonly ON NODES OF mygraph FOR SELECT TO analytics USING (public = 1)",
) else {
panic!("CREATE POLICY should lower through SQL frontend");
};
assert_eq!(policy.name, "readonly");
assert_eq!(policy.table, "mygraph");
assert_eq!(policy.action, Some(PolicyAction::Select));
assert_eq!(policy.role.as_deref(), Some("analytics"));
assert_eq!(policy.target_kind.as_ident(), "nodes");
assert!(matches!(
expr("DROP POLICY IF EXISTS readonly ON mygraph"),
QueryExpr::DropPolicy(DropPolicyQuery {
name,
table,
if_exists: true,
}) if name == "readonly" && table == "mygraph"
));
assert!(matches!(
expr("GRANT SELECT ON TABLE public.users TO tenant1.alice"),
QueryExpr::Grant(grant)
if grant.actions == vec!["SELECT"]
&& grant.objects[0].schema.as_deref() == Some("public")
));
assert!(matches!(
expr("REVOKE GRANT OPTION FOR USAGE ON SCHEMA analytics FROM GROUP analysts"),
QueryExpr::Revoke(revoke) if revoke.grant_option_for && revoke.all == false
));
assert!(matches!(
expr("ALTER USER bob ENABLE SET search_path TO 'public'"),
QueryExpr::AlterUser(user)
if user.username == "bob" && user.attributes.len() == 2
));
assert!(matches!(
expr("CREATE USER tenant1.alice WITH PASSWORD 'pw' ROLE write"),
QueryExpr::CreateUser(user)
if user.tenant.as_deref() == Some("tenant1")
&& user.username == "alice"
&& user.password == "pw"
&& user.role == "write"
));
assert!(matches!(
expr("CREATE POLICY 'readonly' AS '{\"Statement\":[]}'"),
QueryExpr::CreateIamPolicy { id, json }
if id == "readonly" && json == "{\"Statement\":[]}"
));
assert!(matches!(
expr("DROP POLICY 'readonly'"),
QueryExpr::DropIamPolicy { id } if id == "readonly"
));
assert!(matches!(
expr("CREATE MIGRATION m2 DEPENDS ON m0 BATCH 10 ROWS AS CREATE TABLE accounts (id INT)"),
QueryExpr::CreateMigration(migration)
if migration.name == "m2"
&& migration.depends_on == vec!["m0".to_string()]
&& migration.batch_size == Some(10)
));
assert!(matches!(
expr("APPLY MIGRATION * FOR TENANT tenant1"),
QueryExpr::ApplyMigration(apply)
if apply.for_tenant.as_deref() == Some("tenant1")
));
assert!(matches!(
expr("ROLLBACK MIGRATION m2"),
QueryExpr::RollbackMigration(RollbackMigrationQuery { name }) if name == "m2"
));
assert!(matches!(
expr("EXPLAIN MIGRATION m2"),
QueryExpr::ExplainMigration(ExplainMigrationQuery { name }) if name == "m2"
));
}
#[test]
fn parse_sql_statement_covers_statement_category_wrapping() {
enum Expected {
Select,
Insert,
CreateSchema,
SetTenant,
}
let cases = [
("SELECT * FROM users", Expected::Select),
("INSERT INTO users (id) VALUES (1)", Expected::Insert),
("CREATE SCHEMA app", Expected::CreateSchema),
("SET TENANT 'acme'", Expected::SetTenant),
];
for (input, expected) in cases {
let mut parser = Parser::new(input).expect("lexer");
let statement = parser
.parse_sql_statement()
.unwrap_or_else(|err| panic!("failed to parse {input:?}: {err:?}"));
let matched = match expected {
Expected::Select => matches!(statement, SqlStatement::Query(SqlQuery::Select(_))),
Expected::Insert => {
matches!(statement, SqlStatement::Mutation(SqlMutation::Insert(_)))
}
Expected::CreateSchema => matches!(
statement,
SqlStatement::Schema(SqlSchemaCommand::CreateSchema(_))
),
Expected::SetTenant => {
matches!(
statement,
SqlStatement::Admin(SqlAdminCommand::SetTenant(_))
)
}
};
assert!(matched, "{input}");
}
}
#[test]
fn parse_frontend_routes_non_sql_frontends() {
let QueryExpr::KvCommand(KvCommand::Get {
model,
collection,
key,
}) = expr("KV GET settings.feature")
else {
panic!("KV GET should route to FrontendStatement::KvCommand");
};
assert_eq!(model, CollectionModel::Kv);
assert_eq!(collection, "settings");
assert_eq!(key, "feature");
let QueryExpr::ConfigCommand(ConfigCommand::Watch {
collection,
key,
prefix,
from_lsn,
}) = expr("WATCH CONFIG app PREFIX feature FROM LSN 7")
else {
panic!("WATCH CONFIG should route to FrontendStatement::ConfigCommand");
};
assert_eq!(collection, "app");
assert_eq!(key, "feature");
assert!(prefix);
assert_eq!(from_lsn, Some(7));
let QueryExpr::ConfigCommand(ConfigCommand::List {
collection,
prefix,
limit,
offset,
}) = expr("LIST CONFIG app PREFIX feature LIMIT 3 OFFSET 1")
else {
panic!("LIST CONFIG should route to FrontendStatement::ConfigCommand");
};
assert_eq!(collection, "app");
assert_eq!(prefix.as_deref(), Some("feature"));
assert_eq!(limit, Some(3));
assert_eq!(offset, 1);
let QueryExpr::KvCommand(KvCommand::List {
model,
collection,
prefix,
limit,
offset,
as_json,
}) = expr("KV LIST settings PREFIX 'feature.' LIMIT 10 OFFSET 2")
else {
panic!("KV LIST should route to FrontendStatement::KvCommand");
};
assert_eq!(model, CollectionModel::Kv);
assert_eq!(collection, "settings");
assert_eq!(prefix.as_deref(), Some("feature."));
assert_eq!(limit, Some(10));
assert_eq!(offset, 2);
assert!(!as_json);
let QueryExpr::KvCommand(KvCommand::List {
model,
collection,
prefix,
limit,
offset,
as_json,
}) = expr("LIST KV settings PREFIX feature LIMIT 10 OFFSET 2")
else {
panic!("LIST KV should route to FrontendStatement::KvCommand");
};
assert_eq!(model, CollectionModel::Kv);
assert_eq!(collection, "settings");
assert_eq!(prefix.as_deref(), Some("feature"));
assert_eq!(limit, Some(10));
assert_eq!(offset, 2);
assert!(!as_json);
let QueryExpr::KvCommand(KvCommand::List {
model,
collection,
prefix,
as_json,
..
}) = expr("KV LIST settings PREFIX feature AS JSON")
else {
panic!("KV LIST AS JSON should route to FrontendStatement::KvCommand");
};
assert_eq!(model, CollectionModel::Kv);
assert_eq!(collection, "settings");
assert_eq!(prefix.as_deref(), Some("feature"));
assert!(as_json);
let QueryExpr::KvCommand(KvCommand::Watch {
model,
collection,
key,
prefix,
from_lsn,
}) = expr("WATCH sessions.user.* FROM LSN 3")
else {
panic!("bare WATCH should route to FrontendStatement::KvCommand");
};
assert_eq!(model, CollectionModel::Kv);
assert_eq!(collection, "sessions");
assert_eq!(key, "user");
assert!(prefix);
assert_eq!(from_lsn, Some(3));
let QueryExpr::KvCommand(KvCommand::Watch {
model,
collection,
key,
prefix,
from_lsn,
}) = expr("WATCH VAULT secrets PREFIX api FROM LSN 7")
else {
panic!("WATCH VAULT should route to FrontendStatement::KvCommand");
};
assert_eq!(model, CollectionModel::Vault);
assert_eq!(collection, "secrets");
assert_eq!(key, "api");
assert!(prefix);
assert_eq!(from_lsn, Some(7));
let QueryExpr::KvCommand(KvCommand::List {
model,
collection,
prefix,
limit,
offset,
as_json,
}) = expr("LIST VAULT secrets PREFIX api LIMIT 10 OFFSET 2")
else {
panic!("LIST VAULT should route to FrontendStatement::KvCommand");
};
assert_eq!(model, CollectionModel::Vault);
assert_eq!(collection, "secrets");
assert_eq!(prefix.as_deref(), Some("api"));
assert_eq!(limit, Some(10));
assert_eq!(offset, 2);
assert!(!as_json);
assert!(matches!(
expr("INVALIDATE CONFIG app feature_flag"),
QueryExpr::ConfigCommand(ConfigCommand::InvalidVolatileOperation {
operation,
collection,
key: Some(key),
}) if operation == "INVALIDATE" && collection == "app" && key == "feature_flag"
));
assert!(matches!(
expr("INVALIDATE TAGS [user:42, org:7] FROM sessions"),
QueryExpr::KvCommand(KvCommand::InvalidateTags { collection, tags })
if collection == "sessions" && tags == vec!["user:42".to_string(), "org:7".to_string()]
));
let QueryExpr::EventsBackfill(query) =
expr("EVENTS BACKFILL users WHERE status = 'active' TO audit LIMIT 10")
else {
panic!("EVENTS BACKFILL should route to FrontendStatement::EventsBackfill");
};
assert_eq!(query.collection, "users");
assert_eq!(query.where_filter.as_deref(), Some("status = 'active'"));
assert_eq!(query.target_queue, "audit");
assert_eq!(query.limit, Some(10));
let QueryExpr::Table(query) = expr("EVENTS STATUS users LIMIT 2") else {
panic!("EVENTS STATUS should route through the SQL select surface");
};
assert_eq!(query.table, "red.subscriptions");
assert_eq!(query.limit, Some(2));
assert!(query.filter.is_some());
assert!(matches!(
expr("EVENTS BACKFILL STATUS users"),
QueryExpr::EventsBackfillStatus { collection } if collection == "users"
));
assert!(parse_frontend("LIST UNKNOWN").is_err());
assert!(parse_frontend("EVENTS UNKNOWN").is_err());
}
#[test]
fn parse_frontend_routes_ranking_reads() {
assert!(matches!(
expr("RANK OF 42 IN page_rank"),
QueryExpr::RankOf(RankOfQuery { ranking, entity_id })
if ranking == "page_rank" && entity_id == 42
));
assert!(matches!(
expr("APPROX RANK OF 7 IN page_rank"),
QueryExpr::ApproxRankOf(RankOfQuery { ranking, entity_id })
if ranking == "page_rank" && entity_id == 7
));
assert!(matches!(
expr("RANK RANGE 1 TO 3 IN page_rank"),
QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi })
if ranking == "page_rank" && lo == 1 && hi == 3
));
assert!(matches!(
expr("ZRANK page_rank 0"),
QueryExpr::RankOf(RankOfQuery { ranking, entity_id })
if ranking == "page_rank" && entity_id == 0
));
assert!(matches!(
expr("ZRANGE page_rank 0 3 WITHSCORES"),
QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi })
if ranking == "page_rank" && lo == 1 && hi == 4
));
assert!(
parse_frontend("RANK RANGE 3 TO 1 IN page_rank").is_err(),
"rank range must reject reversed bounds"
);
}
#[test]
fn parse_frontend_covers_multimodel_command_routing() {
assert!(matches!(
expr("GRAPH CENTRALITY ALGORITHM pagerank LIMIT 5"),
QueryExpr::GraphCommand(GraphCommand::Centrality {
algorithm,
limit: Some(5),
..
}) if algorithm == "pagerank"
));
assert!(matches!(
expr("SEARCH TEXT 'login failure' COLLECTION incidents LIMIT 20 FUZZY"),
QueryExpr::SearchCommand(SearchCommand::Text {
query,
collection: Some(collection),
limit: 20,
fuzzy: true,
..
}) if query == "login failure" && collection == "incidents"
));
assert!(matches!(
expr("ASK 'why did login fail?' USING openai LIMIT 3"),
QueryExpr::Ask(query)
if query.question == "why did login fail?"
&& query.provider.as_deref() == Some("openai")
&& query.limit == Some(3)
));
assert!(matches!(
expr("QUEUE LEN tasks"),
QueryExpr::QueueCommand(QueueCommand::Len { queue }) if queue == "tasks"
));
assert!(matches!(
expr("TREE REBALANCE forest.org DRY RUN"),
QueryExpr::TreeCommand(TreeCommand::Rebalance {
collection,
tree_name,
dry_run: true,
}) if collection == "forest" && tree_name == "org"
));
assert!(matches!(
expr("HLL COUNT visitors"),
QueryExpr::ProbabilisticCommand(ProbabilisticCommand::HllCount { names })
if names == vec!["visitors".to_string()]
));
}
#[test]
fn sql_command_round_trips_multimodel_schema_variants() {
macro_rules! assert_command_round_trip {
($input:expr, $pattern:pat) => {{
let command = sql_command($input);
assert!(matches!(command, $pattern), "unexpected command for {}", $input);
let statement = sql_command($input).into_statement();
let command = statement.into_command();
assert!(
matches!(command, $pattern),
"statement round trip changed command for {}",
$input
);
let expr = sql_command($input).into_query_expr();
assert!(
!matches!(expr, QueryExpr::Table(TableQuery { table, .. }) if table.is_empty()),
"lowering produced an empty table placeholder for {}",
$input
);
}};
}
assert_command_round_trip!(
"EXPLAIN ALTER FOR CREATE TABLE users (id INT) FORMAT JSON",
SqlCommand::ExplainAlter(_)
);
assert_command_round_trip!("CREATE TABLE users (id INT)", SqlCommand::CreateTable(_));
assert_command_round_trip!("DROP TABLE IF EXISTS users", SqlCommand::DropTable(_));
assert_command_round_trip!(
"ALTER TABLE users ADD COLUMN status TEXT",
SqlCommand::AlterTable(_)
);
assert_command_round_trip!(
"CREATE INDEX idx_email ON users (email) USING HASH",
SqlCommand::CreateIndex(_)
);
assert_command_round_trip!(
"DROP INDEX IF EXISTS idx_email ON users",
SqlCommand::DropIndex(_)
);
assert_command_round_trip!("CREATE GRAPH identity", SqlCommand::CreateTable(_));
assert_command_round_trip!("CREATE DOCUMENT docs", SqlCommand::CreateTable(_));
assert_command_round_trip!(
"CREATE VECTOR embeddings DIM 4",
SqlCommand::CreateVector(_)
);
assert_command_round_trip!(
"CREATE COLLECTION turbo KIND vector.turbo DIM 3",
SqlCommand::CreateCollection(_)
);
assert_command_round_trip!("CREATE KV settings", SqlCommand::CreateTable(_));
assert_command_round_trip!("CREATE CONFIG app", SqlCommand::CreateTable(_));
assert_command_round_trip!(
"CREATE VAULT secrets WITH OWN MASTER KEY",
SqlCommand::CreateTable(_)
);
assert_command_round_trip!(
"CREATE TIMESERIES metrics RETENTION 90 d",
SqlCommand::CreateTimeSeries(_)
);
assert_command_round_trip!(
"CREATE METRIC svc.latency TYPE gauge ROLE sli",
SqlCommand::CreateMetric(_)
);
assert_command_round_trip!(
"ALTER METRIC svc.latency SET ROLE internal",
SqlCommand::AlterMetric(_)
);
assert_command_round_trip!(
"CREATE SLO svc.availability ON svc.latency TARGET 99.9 WINDOW 5 m",
SqlCommand::CreateSlo(_)
);
assert_command_round_trip!(
"CREATE QUEUE tasks MAX_SIZE 100",
SqlCommand::CreateQueue(_)
);
assert_command_round_trip!(
"ALTER QUEUE tasks SET MODE FANOUT",
SqlCommand::AlterQueue(_)
);
assert_command_round_trip!(
"CREATE TREE org IN forest ROOT LABEL root MAX_CHILDREN 4",
SqlCommand::CreateTree(_)
);
assert_command_round_trip!(
"CREATE HLL visitors PRECISION 14",
SqlCommand::Probabilistic(_)
);
assert_command_round_trip!(
"CREATE SKETCH freqs WIDTH 512 DEPTH 3",
SqlCommand::Probabilistic(_)
);
assert_command_round_trip!(
"CREATE FILTER seen CAPACITY 1024",
SqlCommand::Probabilistic(_)
);
assert_command_round_trip!("COPY users FROM '/tmp/u.csv'", SqlCommand::CopyFrom(_));
assert_command_round_trip!(
"CREATE VIEW active_users AS SELECT * FROM users",
SqlCommand::CreateView(_)
);
assert_command_round_trip!("DROP VIEW active_users", SqlCommand::DropView(_));
assert_command_round_trip!(
"REFRESH MATERIALIZED VIEW active_users",
SqlCommand::RefreshMaterializedView(_)
);
assert_command_round_trip!(
"CREATE SERVER mycsv FOREIGN DATA WRAPPER csv OPTIONS (base_path '/data')",
SqlCommand::CreateServer(_)
);
assert_command_round_trip!(
"DROP SERVER IF EXISTS mycsv CASCADE",
SqlCommand::DropServer(_)
);
assert_command_round_trip!(
"CREATE FOREIGN TABLE ext_users (id INT, name TEXT) SERVER mycsv OPTIONS (path 'users.csv')",
SqlCommand::CreateForeignTable(_)
);
assert_command_round_trip!(
"DROP FOREIGN TABLE IF EXISTS ext_users",
SqlCommand::DropForeignTable(_)
);
}
#[test]
fn sql_command_round_trips_drop_truncate_and_maintenance_variants() {
macro_rules! assert_command_round_trip {
($input:expr, $pattern:pat) => {{
let command = sql_command($input);
assert!(
matches!(command, $pattern),
"unexpected command for {}",
$input
);
let statement = sql_command($input).into_statement();
assert!(
matches!(statement.into_command(), $pattern),
"statement round trip changed command for {}",
$input
);
}};
}
assert_command_round_trip!("DROP GRAPH IF EXISTS identity", SqlCommand::DropGraph(_));
assert_command_round_trip!(
"DROP VECTOR IF EXISTS embeddings",
SqlCommand::DropVector(_)
);
assert_command_round_trip!("DROP DOCUMENT IF EXISTS docs", SqlCommand::DropDocument(_));
assert_command_round_trip!("DROP KV IF EXISTS settings", SqlCommand::DropKv(_));
assert_command_round_trip!("DROP CONFIG IF EXISTS app", SqlCommand::DropKv(_));
assert_command_round_trip!("DROP VAULT IF EXISTS secrets", SqlCommand::DropKv(_));
assert_command_round_trip!(
"DROP COLLECTION IF EXISTS docs",
SqlCommand::DropCollection(_)
);
assert_command_round_trip!(
"DROP TIMESERIES IF EXISTS metrics",
SqlCommand::DropTimeSeries(_)
);
assert_command_round_trip!(
"DROP HYPERTABLE IF EXISTS metrics",
SqlCommand::DropTimeSeries(_)
);
assert_command_round_trip!("DROP QUEUE IF EXISTS tasks", SqlCommand::DropQueue(_));
assert_command_round_trip!("DROP TREE IF EXISTS org IN forest", SqlCommand::DropTree(_));
assert_command_round_trip!("DROP HLL IF EXISTS visitors", SqlCommand::Probabilistic(_));
assert_command_round_trip!("DROP SKETCH IF EXISTS freqs", SqlCommand::Probabilistic(_));
assert_command_round_trip!("DROP FILTER IF EXISTS seen", SqlCommand::Probabilistic(_));
assert_command_round_trip!(
"TRUNCATE VECTOR IF EXISTS embeddings",
SqlCommand::Truncate(_)
);
assert_command_round_trip!(
"COMMIT WORK",
SqlCommand::TransactionControl(TxnControl::Commit)
);
assert_command_round_trip!(
"ROLLBACK",
SqlCommand::TransactionControl(TxnControl::Rollback)
);
assert_command_round_trip!(
"SAVEPOINT before_batch",
SqlCommand::TransactionControl(TxnControl::Savepoint(_))
);
assert_command_round_trip!(
"RELEASE SAVEPOINT before_batch",
SqlCommand::TransactionControl(TxnControl::ReleaseSavepoint(_))
);
assert_command_round_trip!(
"ANALYZE users",
SqlCommand::Maintenance(MaintenanceCommand::Analyze { .. })
);
assert_command_round_trip!(
"VACUUM",
SqlCommand::Maintenance(MaintenanceCommand::Vacuum { .. })
);
}
#[test]
fn parse_sql_command_covers_show_and_error_branches() {
assert!(matches!(
sql_command("SHOW CREATE TABLE public.users"),
SqlCommand::Select(TableQuery { table, .. }) if table == "red.show_create"
));
assert!(matches!(
sql_command("SHOW COLLECTIONS INCLUDING INTERNAL LIMIT 2"),
SqlCommand::Select(TableQuery { table, limit: Some(2), .. }) if table == "red.collections"
));
assert!(matches!(
sql_command("SHOW QUEUES INCLUDING INTERNAL"),
SqlCommand::Select(TableQuery { table, filter: None, .. }) if table == "red.queues"
));
assert!(matches!(
sql_command("SHOW INDICES ON users"),
SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.show_indexes"
));
assert!(matches!(
sql_command("SHOW POLICIES ON users WHERE action = 'SELECT'"),
SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.policies"
));
assert!(matches!(
sql_command("SHOW STATS 'users' WHERE rows > 0"),
SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.stats"
));
assert!(matches!(
sql_command("SHOW SAMPLE users"),
SqlCommand::Select(TableQuery { table, limit: Some(10), .. }) if table == "users"
));
assert!(matches!(
sql_command("DESC public.users"),
SqlCommand::Select(TableQuery { table, filter: Some(_), .. }) if table == "red.describe"
));
assert!(
sql_command_result("CREATE VIEW v WITH RETENTION 1 h AS SELECT * FROM users").is_err()
);
assert!(sql_command_result("CREATE TABLE bad WITH ANALYTICS (centrality)").is_err());
assert!(sql_command_result("BEGIN ISOLATION LEVEL SERIALIZABLE").is_err());
assert!(sql_command_result("EVENTS BACKFILL STATUS users").is_err());
}
#[test]
fn parse_sql_command_covers_remaining_catalog_and_copy_shapes() {
for input in [
"SHOW VECTORS",
"SHOW DOCUMENTS",
"SHOW TIMESERIES",
"SHOW GRAPHS",
"SHOW CONFIGS",
"SHOW VAULTS",
"SHOW KV",
"SHOW SCHEMA public.users",
] {
assert!(
matches!(sql_command(input), SqlCommand::Select(_)),
"{input}"
);
}
for input in [
"TRUNCATE TABLE users",
"TRUNCATE GRAPH identity",
"TRUNCATE DOCUMENT docs",
"TRUNCATE TIMESERIES metrics",
"TRUNCATE METRICS metrics",
"TRUNCATE KV settings",
"TRUNCATE QUEUE tasks",
"TRUNCATE COLLECTION docs",
] {
assert!(
matches!(sql_command(input), SqlCommand::Truncate(_)),
"{input}"
);
}
assert!(sql_command_result("TRUNCATE UNKNOWN users").is_err());
let SqlCommand::CopyFrom(copy) = sql_command("COPY users FROM '/tmp/u.csv' WITH (HEADER)")
else {
panic!("expected COPY");
};
assert!(copy.has_header);
assert_eq!(copy.delimiter, None);
let SqlCommand::CopyFrom(copy) =
sql_command("COPY users FROM '/tmp/u.csv' WITH (HEADER = false)")
else {
panic!("expected COPY");
};
assert!(!copy.has_header);
let SqlCommand::CopyFrom(copy) =
sql_command("COPY users FROM '/tmp/u.csv' DELIMITER '|' HEADER")
else {
panic!("expected COPY");
};
assert_eq!(copy.delimiter, Some('|'));
assert!(copy.has_header);
}
#[test]
fn parse_sql_command_covers_remaining_ranking_and_event_errors() {
assert!(matches!(
expr("APPROXIMATE RANK OF 9 IN page_rank"),
QueryExpr::ApproxRankOf(RankOfQuery { ranking, entity_id })
if ranking == "page_rank" && entity_id == 9
));
assert!(parse_frontend("APPROX OF 7 IN page_rank").is_err());
assert!(parse_frontend("RANK 1 IN page_rank").is_err());
assert!(parse_frontend("RANK RANGE 0 TO 3 IN page_rank").is_err());
assert!(parse_frontend("ZRANK page_rank -1").is_err());
assert!(parse_frontend("ZRANGE page_rank 3 1").is_err());
let QueryExpr::Table(query) = expr("EVENTS STATUS 'users' WHERE active = true") else {
panic!("EVENTS STATUS should accept a quoted collection");
};
assert_eq!(query.table, "red.subscriptions");
assert!(query.filter.is_some());
assert!(query.where_expr.is_some());
let QueryExpr::EventsBackfill(query) = expr("EVENTS BACKFILL users TO audit") else {
panic!("EVENTS BACKFILL should allow omitted filter and limit");
};
assert_eq!(query.collection, "users");
assert_eq!(query.where_filter, None);
assert_eq!(query.limit, None);
assert!(parse_frontend("EVENTS BACKFILL users WHERE TO audit").is_err());
}
#[test]
fn parse_sql_command_covers_analytics_non_goal_rejections() {
for head in ["ANALYTICS", "EVENT", "COHORT", "FUNNEL", "SLA", "ADAPTER"] {
let err = sql_command_result(&format!("CREATE {head} demo"))
.expect_err("analytics v0 non-goal should be rejected");
assert!(err.to_string().contains(&format!("CREATE {head}")), "{err}");
}
}
#[test]
fn parse_sql_command_covers_transaction_isolation_edges() {
for input in [
"BEGIN ISOLATION LEVEL READ UNCOMMITTED",
"BEGIN ISOLATION LEVEL READ COMMITTED",
"BEGIN ISOLATION LEVEL REPEATABLE READ",
"START TRANSACTION ISOLATION LEVEL SNAPSHOT",
] {
assert!(
matches!(
sql_command(input),
SqlCommand::TransactionControl(TxnControl::Begin)
),
"{input}"
);
}
assert!(sql_command_result("BEGIN ISOLATION LEVEL READ").is_err());
assert!(sql_command_result("BEGIN ISOLATION LEVEL REPEATABLE").is_err());
assert!(sql_command_result("BEGIN ISOLATION LEVEL CHAOS").is_err());
}
#[test]
fn parse_sql_command_covers_iam_and_hypertable_dispatch_edges() {
assert!(matches!(
expr("CREATE HYPERTABLE metrics TIME_COLUMN ts CHUNK_INTERVAL '1d'"),
QueryExpr::CreateTimeSeries(query)
if query.name == "metrics" && query.hypertable.is_some()
));
assert!(sql_command_result("CREATE OR TABLE bad (id INT)").is_err());
assert!(sql_command_result("DROP MATERIALIZED TABLE bad").is_err());
assert!(matches!(
expr("ATTACH POLICY 'readonly' TO USER tenant1.alice"),
QueryExpr::AttachPolicy { policy_id, .. } if policy_id == "readonly"
));
assert!(matches!(
expr("DETACH POLICY 'readonly' FROM GROUP analysts"),
QueryExpr::DetachPolicy { policy_id, .. } if policy_id == "readonly"
));
assert!(matches!(
expr("SHOW POLICIES FOR USER alice"),
QueryExpr::ShowPolicies { filter: Some(_) }
));
assert!(matches!(
expr("SHOW EFFECTIVE PERMISSIONS FOR alice"),
QueryExpr::ShowEffectivePermissions { resource: None, .. }
));
assert!(matches!(
expr("SIMULATE alice ACTION 'iam:PassRole' ON TABLE:public.orders"),
QueryExpr::SimulatePolicy { action, .. } if action == "iam:PassRole"
));
assert!(matches!(
expr("LINT POLICY JSON '{\"Statement\":[]}'"),
QueryExpr::LintPolicy { .. }
));
assert!(matches!(
expr("MIGRATE POLICY MODE TO 'policy_only' DRY RUN"),
QueryExpr::MigratePolicyMode {
target,
dry_run: true,
} if target == "policy_only"
));
assert!(parse_frontend("MIGRATE OTHER").is_err());
}
#[test]
fn parse_frontend_rejects_trailing_tokens() {
let err = parse_frontend("SET TENANT 'acme' junk")
.expect_err("parse_frontend should reject trailing tokens");
assert!(
err.to_string().contains("Unexpected token after query"),
"{err}"
);
}
}
fn add_table_filter(query: &mut TableQuery, filter: Filter) {
let combined = match query.filter.take() {
Some(existing) => existing.and(filter),
None => filter,
};
query.where_expr = Some(filter_to_expr(&combined));
query.filter = Some(combined);
}
fn parse_show_collections_by_model(
parser: &mut Parser<'_>,
model: &str,
) -> Result<TableQuery, ParseError> {
let mut query = TableQuery::new("red.collections");
parser.parse_table_clauses(&mut query)?;
add_table_filter(&mut query, collection_model_filter(model));
Ok(query)
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum SqlQuery {
Select(TableQuery),
Join(JoinQuery),
}
#[derive(Debug, Clone)]
pub enum SqlMutation {
Insert(InsertQuery),
Update(UpdateQuery),
Delete(DeleteQuery),
}
#[derive(Debug, Clone)]
pub enum SqlSchemaCommand {
ExplainAlter(ExplainAlterQuery),
CreateTable(CreateTableQuery),
CreateCollection(CreateCollectionQuery),
CreateVector(CreateVectorQuery),
DropTable(DropTableQuery),
DropGraph(DropGraphQuery),
DropVector(DropVectorQuery),
DropDocument(DropDocumentQuery),
DropKv(DropKvQuery),
DropCollection(DropCollectionQuery),
Truncate(TruncateQuery),
AlterTable(AlterTableQuery),
CreateIndex(CreateIndexQuery),
DropIndex(DropIndexQuery),
CreateTimeSeries(CreateTimeSeriesQuery),
CreateMetric(CreateMetricQuery),
AlterMetric(AlterMetricQuery),
CreateSlo(CreateSloQuery),
DropTimeSeries(DropTimeSeriesQuery),
CreateQueue(CreateQueueQuery),
AlterQueue(AlterQueueQuery),
DropQueue(DropQueueQuery),
CreateTree(CreateTreeQuery),
DropTree(DropTreeQuery),
Probabilistic(ProbabilisticCommand),
CreateSchema(CreateSchemaQuery),
DropSchema(DropSchemaQuery),
CreateSequence(CreateSequenceQuery),
DropSequence(DropSequenceQuery),
CopyFrom(CopyFromQuery),
CreateView(CreateViewQuery),
DropView(DropViewQuery),
RefreshMaterializedView(RefreshMaterializedViewQuery),
CreatePolicy(CreatePolicyQuery),
DropPolicy(DropPolicyQuery),
CreateServer(CreateServerQuery),
DropServer(DropServerQuery),
CreateForeignTable(CreateForeignTableQuery),
DropForeignTable(DropForeignTableQuery),
CreateMigration(CreateMigrationQuery),
ApplyMigration(ApplyMigrationQuery),
RollbackMigration(RollbackMigrationQuery),
ExplainMigration(ExplainMigrationQuery),
}
#[derive(Debug, Clone)]
#[allow(clippy::large_enum_variant)]
pub enum SqlAdminCommand {
SetConfig {
key: String,
value: Value,
},
ShowConfig {
prefix: Option<String>,
as_json: bool,
},
SetSecret {
key: String,
value: Value,
},
DeleteSecret {
key: String,
},
ShowSecrets {
prefix: Option<String>,
},
SetTenant(Option<String>),
ShowTenant,
TransactionControl(TxnControl),
Maintenance(MaintenanceCommand),
Grant(GrantStmt),
Revoke(RevokeStmt),
AlterUser(AlterUserStmt),
CreateUser(CreateUserStmt),
IamPolicy(QueryExpr),
}
impl SqlStatement {
pub fn into_command(self) -> SqlCommand {
match self {
SqlStatement::Query(SqlQuery::Select(query)) => SqlCommand::Select(query),
SqlStatement::Query(SqlQuery::Join(query)) => SqlCommand::Join(query),
SqlStatement::Mutation(SqlMutation::Insert(query)) => SqlCommand::Insert(query),
SqlStatement::Mutation(SqlMutation::Update(query)) => SqlCommand::Update(query),
SqlStatement::Mutation(SqlMutation::Delete(query)) => SqlCommand::Delete(query),
SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query)) => {
SqlCommand::ExplainAlter(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateTable(query)) => {
SqlCommand::CreateTable(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query)) => {
SqlCommand::CreateCollection(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateVector(query)) => {
SqlCommand::CreateVector(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropTable(query)) => {
SqlCommand::DropTable(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropGraph(query)) => {
SqlCommand::DropGraph(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropVector(query)) => {
SqlCommand::DropVector(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropDocument(query)) => {
SqlCommand::DropDocument(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropKv(query)) => SqlCommand::DropKv(query),
SqlStatement::Schema(SqlSchemaCommand::DropCollection(query)) => {
SqlCommand::DropCollection(query)
}
SqlStatement::Schema(SqlSchemaCommand::Truncate(query)) => SqlCommand::Truncate(query),
SqlStatement::Schema(SqlSchemaCommand::AlterTable(query)) => {
SqlCommand::AlterTable(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query)) => {
SqlCommand::CreateIndex(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropIndex(query)) => {
SqlCommand::DropIndex(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query)) => {
SqlCommand::CreateTimeSeries(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query)) => {
SqlCommand::CreateMetric(query)
}
SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query)) => {
SqlCommand::AlterMetric(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query)) => {
SqlCommand::CreateSlo(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query)) => {
SqlCommand::DropTimeSeries(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query)) => {
SqlCommand::CreateQueue(query)
}
SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query)) => {
SqlCommand::AlterQueue(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropQueue(query)) => {
SqlCommand::DropQueue(query)
}
SqlStatement::Schema(SqlSchemaCommand::CreateTree(query)) => {
SqlCommand::CreateTree(query)
}
SqlStatement::Schema(SqlSchemaCommand::DropTree(query)) => SqlCommand::DropTree(query),
SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command)) => {
SqlCommand::Probabilistic(command)
}
SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value }) => {
SqlCommand::SetConfig { key, value }
}
SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix, as_json }) => {
SqlCommand::ShowConfig { prefix, as_json }
}
SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value }) => {
SqlCommand::SetSecret { key, value }
}
SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key }) => {
SqlCommand::DeleteSecret { key }
}
SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix }) => {
SqlCommand::ShowSecrets { prefix }
}
SqlStatement::Admin(SqlAdminCommand::SetTenant(value)) => SqlCommand::SetTenant(value),
SqlStatement::Admin(SqlAdminCommand::ShowTenant) => SqlCommand::ShowTenant,
SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl)) => {
SqlCommand::TransactionControl(ctl)
}
SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)) => SqlCommand::Maintenance(cmd),
SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)) => SqlCommand::CreateSchema(q),
SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)) => SqlCommand::DropSchema(q),
SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q)) => {
SqlCommand::CreateSequence(q)
}
SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)) => SqlCommand::DropSequence(q),
SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)) => SqlCommand::CopyFrom(q),
SqlStatement::Schema(SqlSchemaCommand::CreateView(q)) => SqlCommand::CreateView(q),
SqlStatement::Schema(SqlSchemaCommand::DropView(q)) => SqlCommand::DropView(q),
SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q)) => {
SqlCommand::RefreshMaterializedView(q)
}
SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)) => SqlCommand::CreatePolicy(q),
SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)) => SqlCommand::DropPolicy(q),
SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)) => SqlCommand::CreateServer(q),
SqlStatement::Schema(SqlSchemaCommand::DropServer(q)) => SqlCommand::DropServer(q),
SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q)) => {
SqlCommand::CreateForeignTable(q)
}
SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q)) => {
SqlCommand::DropForeignTable(q)
}
SqlStatement::Admin(SqlAdminCommand::Grant(s)) => SqlCommand::Grant(s),
SqlStatement::Admin(SqlAdminCommand::Revoke(s)) => SqlCommand::Revoke(s),
SqlStatement::Admin(SqlAdminCommand::AlterUser(s)) => SqlCommand::AlterUser(s),
SqlStatement::Admin(SqlAdminCommand::CreateUser(s)) => SqlCommand::CreateUser(s),
SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)) => SqlCommand::IamPolicy(e),
SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q)) => {
SqlCommand::CreateMigration(q)
}
SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q)) => {
SqlCommand::ApplyMigration(q)
}
SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q)) => {
SqlCommand::RollbackMigration(q)
}
SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q)) => {
SqlCommand::ExplainMigration(q)
}
}
}
pub fn into_query_expr(self) -> QueryExpr {
self.into_command().into_query_expr()
}
}
impl FrontendStatement {
pub fn into_query_expr(self) -> QueryExpr {
match self {
FrontendStatement::Sql(statement) => statement.into_query_expr(),
FrontendStatement::Graph(query) => QueryExpr::Graph(query),
FrontendStatement::GraphCommand(command) => QueryExpr::GraphCommand(command),
FrontendStatement::Path(query) => QueryExpr::Path(query),
FrontendStatement::Vector(query) => QueryExpr::Vector(query),
FrontendStatement::Hybrid(query) => QueryExpr::Hybrid(query),
FrontendStatement::Search(command) => QueryExpr::SearchCommand(command),
FrontendStatement::Ask(query) => QueryExpr::Ask(query),
FrontendStatement::QueueSelect(query) => QueryExpr::QueueSelect(query),
FrontendStatement::QueueCommand(command) => QueryExpr::QueueCommand(command),
FrontendStatement::EventsBackfill(query) => QueryExpr::EventsBackfill(query),
FrontendStatement::EventsBackfillStatus { collection } => {
QueryExpr::EventsBackfillStatus { collection }
}
FrontendStatement::TreeCommand(command) => QueryExpr::TreeCommand(command),
FrontendStatement::ProbabilisticCommand(command) => {
QueryExpr::ProbabilisticCommand(command)
}
FrontendStatement::KvCommand(command) => QueryExpr::KvCommand(command),
FrontendStatement::ConfigCommand(command) => QueryExpr::ConfigCommand(command),
FrontendStatement::Ranking(expr) => expr,
}
}
}
pub fn parse_frontend(input: &str) -> Result<FrontendStatement, ParseError> {
let mut parser = Parser::new(input)?;
let statement = parser.parse_frontend_statement()?;
if !parser.check(&Token::Eof) {
return Err(ParseError::new(
format!("Unexpected token after query: {:?}", parser.current.token),
parser.position(),
));
}
Ok(statement)
}
impl SqlCommand {
pub fn into_query_expr(self) -> QueryExpr {
match self {
SqlCommand::Select(query) => QueryExpr::Table(query),
SqlCommand::Join(query) => QueryExpr::Join(query),
SqlCommand::Insert(query) => QueryExpr::Insert(query),
SqlCommand::Update(query) => QueryExpr::Update(query),
SqlCommand::Delete(query) => QueryExpr::Delete(query),
SqlCommand::ExplainAlter(query) => QueryExpr::ExplainAlter(query),
SqlCommand::CreateTable(query) => QueryExpr::CreateTable(query),
SqlCommand::CreateCollection(query) => QueryExpr::CreateCollection(query),
SqlCommand::CreateVector(query) => QueryExpr::CreateVector(query),
SqlCommand::DropTable(query) => QueryExpr::DropTable(query),
SqlCommand::DropGraph(query) => QueryExpr::DropGraph(query),
SqlCommand::DropVector(query) => QueryExpr::DropVector(query),
SqlCommand::DropDocument(query) => QueryExpr::DropDocument(query),
SqlCommand::DropKv(query) => QueryExpr::DropKv(query),
SqlCommand::DropCollection(query) => QueryExpr::DropCollection(query),
SqlCommand::Truncate(query) => QueryExpr::Truncate(query),
SqlCommand::AlterTable(query) => QueryExpr::AlterTable(query),
SqlCommand::CreateIndex(query) => QueryExpr::CreateIndex(query),
SqlCommand::DropIndex(query) => QueryExpr::DropIndex(query),
SqlCommand::CreateTimeSeries(query) => QueryExpr::CreateTimeSeries(query),
SqlCommand::CreateMetric(query) => QueryExpr::CreateMetric(query),
SqlCommand::AlterMetric(query) => QueryExpr::AlterMetric(query),
SqlCommand::CreateSlo(query) => QueryExpr::CreateSlo(query),
SqlCommand::DropTimeSeries(query) => QueryExpr::DropTimeSeries(query),
SqlCommand::CreateQueue(query) => QueryExpr::CreateQueue(query),
SqlCommand::AlterQueue(query) => QueryExpr::AlterQueue(query),
SqlCommand::DropQueue(query) => QueryExpr::DropQueue(query),
SqlCommand::CreateTree(query) => QueryExpr::CreateTree(query),
SqlCommand::DropTree(query) => QueryExpr::DropTree(query),
SqlCommand::Probabilistic(command) => QueryExpr::ProbabilisticCommand(command),
SqlCommand::SetConfig { key, value } => QueryExpr::SetConfig { key, value },
SqlCommand::ShowConfig { prefix, as_json } => QueryExpr::ShowConfig { prefix, as_json },
SqlCommand::SetSecret { key, value } => QueryExpr::SetSecret { key, value },
SqlCommand::DeleteSecret { key } => QueryExpr::DeleteSecret { key },
SqlCommand::ShowSecrets { prefix } => QueryExpr::ShowSecrets { prefix },
SqlCommand::SetTenant(value) => QueryExpr::SetTenant(value),
SqlCommand::ShowTenant => QueryExpr::ShowTenant,
SqlCommand::TransactionControl(ctl) => QueryExpr::TransactionControl(ctl),
SqlCommand::Maintenance(cmd) => QueryExpr::MaintenanceCommand(cmd),
SqlCommand::CreateSchema(q) => QueryExpr::CreateSchema(q),
SqlCommand::DropSchema(q) => QueryExpr::DropSchema(q),
SqlCommand::CreateSequence(q) => QueryExpr::CreateSequence(q),
SqlCommand::DropSequence(q) => QueryExpr::DropSequence(q),
SqlCommand::CopyFrom(q) => QueryExpr::CopyFrom(q),
SqlCommand::CreateView(q) => QueryExpr::CreateView(q),
SqlCommand::DropView(q) => QueryExpr::DropView(q),
SqlCommand::RefreshMaterializedView(q) => QueryExpr::RefreshMaterializedView(q),
SqlCommand::CreatePolicy(q) => QueryExpr::CreatePolicy(q),
SqlCommand::DropPolicy(q) => QueryExpr::DropPolicy(q),
SqlCommand::CreateServer(q) => QueryExpr::CreateServer(q),
SqlCommand::DropServer(q) => QueryExpr::DropServer(q),
SqlCommand::CreateForeignTable(q) => QueryExpr::CreateForeignTable(q),
SqlCommand::DropForeignTable(q) => QueryExpr::DropForeignTable(q),
SqlCommand::Grant(s) => QueryExpr::Grant(s),
SqlCommand::Revoke(s) => QueryExpr::Revoke(s),
SqlCommand::AlterUser(s) => QueryExpr::AlterUser(s),
SqlCommand::CreateUser(s) => QueryExpr::CreateUser(s),
SqlCommand::IamPolicy(e) => e,
SqlCommand::CreateMigration(q) => QueryExpr::CreateMigration(q),
SqlCommand::ApplyMigration(q) => QueryExpr::ApplyMigration(q),
SqlCommand::RollbackMigration(q) => QueryExpr::RollbackMigration(q),
SqlCommand::ExplainMigration(q) => QueryExpr::ExplainMigration(q),
}
}
pub fn into_statement(self) -> SqlStatement {
match self {
SqlCommand::Select(query) => SqlStatement::Query(SqlQuery::Select(query)),
SqlCommand::Join(query) => SqlStatement::Query(SqlQuery::Join(query)),
SqlCommand::Insert(query) => SqlStatement::Mutation(SqlMutation::Insert(query)),
SqlCommand::Update(query) => SqlStatement::Mutation(SqlMutation::Update(query)),
SqlCommand::Delete(query) => SqlStatement::Mutation(SqlMutation::Delete(query)),
SqlCommand::ExplainAlter(query) => {
SqlStatement::Schema(SqlSchemaCommand::ExplainAlter(query))
}
SqlCommand::CreateTable(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateTable(query))
}
SqlCommand::CreateCollection(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateCollection(query))
}
SqlCommand::CreateVector(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateVector(query))
}
SqlCommand::DropTable(query) => {
SqlStatement::Schema(SqlSchemaCommand::DropTable(query))
}
SqlCommand::DropGraph(query) => {
SqlStatement::Schema(SqlSchemaCommand::DropGraph(query))
}
SqlCommand::DropVector(query) => {
SqlStatement::Schema(SqlSchemaCommand::DropVector(query))
}
SqlCommand::DropDocument(query) => {
SqlStatement::Schema(SqlSchemaCommand::DropDocument(query))
}
SqlCommand::DropKv(query) => SqlStatement::Schema(SqlSchemaCommand::DropKv(query)),
SqlCommand::DropCollection(query) => {
SqlStatement::Schema(SqlSchemaCommand::DropCollection(query))
}
SqlCommand::Truncate(query) => SqlStatement::Schema(SqlSchemaCommand::Truncate(query)),
SqlCommand::AlterTable(query) => {
SqlStatement::Schema(SqlSchemaCommand::AlterTable(query))
}
SqlCommand::CreateIndex(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateIndex(query))
}
SqlCommand::DropIndex(query) => {
SqlStatement::Schema(SqlSchemaCommand::DropIndex(query))
}
SqlCommand::CreateTimeSeries(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateTimeSeries(query))
}
SqlCommand::CreateMetric(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateMetric(query))
}
SqlCommand::AlterMetric(query) => {
SqlStatement::Schema(SqlSchemaCommand::AlterMetric(query))
}
SqlCommand::CreateSlo(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateSlo(query))
}
SqlCommand::DropTimeSeries(query) => {
SqlStatement::Schema(SqlSchemaCommand::DropTimeSeries(query))
}
SqlCommand::CreateQueue(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateQueue(query))
}
SqlCommand::AlterQueue(query) => {
SqlStatement::Schema(SqlSchemaCommand::AlterQueue(query))
}
SqlCommand::DropQueue(query) => {
SqlStatement::Schema(SqlSchemaCommand::DropQueue(query))
}
SqlCommand::CreateTree(query) => {
SqlStatement::Schema(SqlSchemaCommand::CreateTree(query))
}
SqlCommand::DropTree(query) => SqlStatement::Schema(SqlSchemaCommand::DropTree(query)),
SqlCommand::Probabilistic(command) => {
SqlStatement::Schema(SqlSchemaCommand::Probabilistic(command))
}
SqlCommand::SetConfig { key, value } => {
SqlStatement::Admin(SqlAdminCommand::SetConfig { key, value })
}
SqlCommand::ShowConfig { prefix, as_json } => {
SqlStatement::Admin(SqlAdminCommand::ShowConfig { prefix, as_json })
}
SqlCommand::SetSecret { key, value } => {
SqlStatement::Admin(SqlAdminCommand::SetSecret { key, value })
}
SqlCommand::DeleteSecret { key } => {
SqlStatement::Admin(SqlAdminCommand::DeleteSecret { key })
}
SqlCommand::ShowSecrets { prefix } => {
SqlStatement::Admin(SqlAdminCommand::ShowSecrets { prefix })
}
SqlCommand::SetTenant(value) => SqlStatement::Admin(SqlAdminCommand::SetTenant(value)),
SqlCommand::ShowTenant => SqlStatement::Admin(SqlAdminCommand::ShowTenant),
SqlCommand::TransactionControl(ctl) => {
SqlStatement::Admin(SqlAdminCommand::TransactionControl(ctl))
}
SqlCommand::Maintenance(cmd) => SqlStatement::Admin(SqlAdminCommand::Maintenance(cmd)),
SqlCommand::CreateSchema(q) => SqlStatement::Schema(SqlSchemaCommand::CreateSchema(q)),
SqlCommand::DropSchema(q) => SqlStatement::Schema(SqlSchemaCommand::DropSchema(q)),
SqlCommand::CreateSequence(q) => {
SqlStatement::Schema(SqlSchemaCommand::CreateSequence(q))
}
SqlCommand::DropSequence(q) => SqlStatement::Schema(SqlSchemaCommand::DropSequence(q)),
SqlCommand::CopyFrom(q) => SqlStatement::Schema(SqlSchemaCommand::CopyFrom(q)),
SqlCommand::CreateView(q) => SqlStatement::Schema(SqlSchemaCommand::CreateView(q)),
SqlCommand::DropView(q) => SqlStatement::Schema(SqlSchemaCommand::DropView(q)),
SqlCommand::RefreshMaterializedView(q) => {
SqlStatement::Schema(SqlSchemaCommand::RefreshMaterializedView(q))
}
SqlCommand::CreatePolicy(q) => SqlStatement::Schema(SqlSchemaCommand::CreatePolicy(q)),
SqlCommand::DropPolicy(q) => SqlStatement::Schema(SqlSchemaCommand::DropPolicy(q)),
SqlCommand::CreateServer(q) => SqlStatement::Schema(SqlSchemaCommand::CreateServer(q)),
SqlCommand::DropServer(q) => SqlStatement::Schema(SqlSchemaCommand::DropServer(q)),
SqlCommand::CreateForeignTable(q) => {
SqlStatement::Schema(SqlSchemaCommand::CreateForeignTable(q))
}
SqlCommand::DropForeignTable(q) => {
SqlStatement::Schema(SqlSchemaCommand::DropForeignTable(q))
}
SqlCommand::Grant(s) => SqlStatement::Admin(SqlAdminCommand::Grant(s)),
SqlCommand::Revoke(s) => SqlStatement::Admin(SqlAdminCommand::Revoke(s)),
SqlCommand::AlterUser(s) => SqlStatement::Admin(SqlAdminCommand::AlterUser(s)),
SqlCommand::CreateUser(s) => SqlStatement::Admin(SqlAdminCommand::CreateUser(s)),
SqlCommand::IamPolicy(e) => SqlStatement::Admin(SqlAdminCommand::IamPolicy(e)),
SqlCommand::CreateMigration(q) => {
SqlStatement::Schema(SqlSchemaCommand::CreateMigration(q))
}
SqlCommand::ApplyMigration(q) => {
SqlStatement::Schema(SqlSchemaCommand::ApplyMigration(q))
}
SqlCommand::RollbackMigration(q) => {
SqlStatement::Schema(SqlSchemaCommand::RollbackMigration(q))
}
SqlCommand::ExplainMigration(q) => {
SqlStatement::Schema(SqlSchemaCommand::ExplainMigration(q))
}
}
}
}
impl<'a> Parser<'a> {
fn parse_events_command(&mut self) -> Result<QueryExpr, ParseError> {
self.expect_ident()?; if self.consume_ident_ci("STATUS")? {
let mut query = TableQuery::new("red.subscriptions");
let collection = match self.peek().clone() {
Token::Ident(name) => {
self.advance()?;
Some(name)
}
Token::String(name) => {
self.advance()?;
Some(name)
}
_ => None,
};
self.parse_table_clauses(&mut query)?;
if let Some(collection) = collection {
let filter = Filter::compare(
FieldRef::column("red.subscriptions", "collection"),
CompareOp::Eq,
Value::text(collection),
);
let expr = filter_to_expr(&filter);
query.where_expr = Some(match query.where_expr.take() {
Some(existing) => Expr::binop(BinOp::And, existing, expr),
None => expr,
});
query.filter = Some(match query.filter.take() {
Some(existing) => existing.and(filter),
None => filter,
});
}
return Ok(QueryExpr::Table(query));
}
if !self.consume_ident_ci("BACKFILL")? {
return Err(ParseError::expected(
vec!["BACKFILL", "STATUS"],
self.peek(),
self.position(),
));
}
if self.consume_ident_ci("STATUS")? {
let collection = self.expect_ident()?;
return Ok(QueryExpr::EventsBackfillStatus { collection });
}
let collection = self.expect_ident()?;
let where_filter = if self.consume(&Token::Where)? {
let mut parts = Vec::new();
while !self.check(&Token::Eof) && !self.check(&Token::To) {
parts.push(self.peek().to_string());
self.advance()?;
}
if parts.is_empty() {
return Err(ParseError::expected(
vec!["predicate"],
self.peek(),
self.position(),
));
}
Some(parts.join(" "))
} else {
None
};
self.expect(Token::To)?;
let target_queue = self.expect_ident()?;
let limit = if self.consume(&Token::Limit)? {
Some(self.parse_positive_integer("LIMIT")? as u64)
} else {
None
};
Ok(QueryExpr::EventsBackfill(EventsBackfillQuery {
collection,
where_filter,
target_queue,
limit,
}))
}
pub(crate) fn parse_fdw_options_clause(&mut self) -> Result<Vec<(String, String)>, ParseError> {
if !self.consume(&Token::Options)? {
return Ok(Vec::new());
}
self.expect(Token::LParen)?;
let mut out: Vec<(String, String)> = Vec::new();
loop {
let was_ident = matches!(self.peek(), Token::Ident(_));
let raw = self.expect_ident_or_keyword()?;
let key = if was_ident {
raw
} else {
raw.to_ascii_lowercase()
};
let value = self.parse_string()?;
out.push((key, value));
if !self.consume(&Token::Comma)? {
break;
}
}
self.expect(Token::RParen)?;
Ok(out)
}
pub fn parse_frontend_statement(&mut self) -> Result<FrontendStatement, ParseError> {
match self.peek() {
Token::Select => match self.parse_select_query()? {
QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
SqlQuery::Select(query),
))),
QueryExpr::Join(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
SqlQuery::Join(query),
))),
QueryExpr::QueueSelect(query) => Ok(FrontendStatement::QueueSelect(query)),
other => Err(ParseError::new(
format!("internal: SELECT produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::From
| Token::Insert
| Token::Update
| Token::Truncate
| Token::Create
| Token::Drop
| Token::Alter
| Token::Set
| Token::Begin
| Token::Commit
| Token::Rollback
| Token::Savepoint
| Token::Release
| Token::Start
| Token::Vacuum
| Token::Analyze
| Token::Copy
| Token::Refresh => self.parse_sql_statement().map(FrontendStatement::Sql),
Token::Explain => {
if matches!(
self.peek_next()?,
Token::Ident(name) if name.eq_ignore_ascii_case("ASK")
) {
match self.parse_explain_ask_query()? {
QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
other => Err(ParseError::new(
format!(
"internal: EXPLAIN ASK produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else {
self.parse_sql_statement().map(FrontendStatement::Sql)
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
self.parse_sql_statement().map(FrontendStatement::Sql)
}
Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
self.parse_sql_statement().map(FrontendStatement::Sql)
}
Token::Ident(name)
if name.eq_ignore_ascii_case("RANK")
|| name.eq_ignore_ascii_case("APPROX")
|| name.eq_ignore_ascii_case("APPROXIMATE")
|| name.eq_ignore_ascii_case("ZRANK")
|| name.eq_ignore_ascii_case("ZRANGE") =>
{
self.parse_ranking_read().map(FrontendStatement::Ranking)
}
Token::Desc => self.parse_sql_statement().map(FrontendStatement::Sql),
Token::Ident(name)
if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
{
self.parse_sql_statement().map(FrontendStatement::Sql)
}
Token::Ident(name)
if name.eq_ignore_ascii_case("GRANT")
|| name.eq_ignore_ascii_case("REVOKE")
|| name.eq_ignore_ascii_case("SIMULATE")
|| name.eq_ignore_ascii_case("LINT")
|| name.eq_ignore_ascii_case("MIGRATE")
|| name.eq_ignore_ascii_case("APPLY") =>
{
self.parse_sql_statement().map(FrontendStatement::Sql)
}
Token::Ident(name) if name.eq_ignore_ascii_case("WATCH") => {
self.advance()?;
if matches!(
self.peek(),
Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
) {
match self.parse_config_watch_after_watch()? {
QueryExpr::ConfigCommand(command) => {
Ok(FrontendStatement::ConfigCommand(command))
}
other => Err(ParseError::new(
format!(
"internal: WATCH CONFIG produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else if matches!(
self.peek(),
Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
) {
match self.parse_vault_watch_after_watch()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!(
"internal: WATCH VAULT produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else {
match self.parse_kv_watch(reddb_types::catalog::CollectionModel::Kv)? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: WATCH produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
}
Token::List => {
self.advance()?;
if matches!(
self.peek(),
Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
) {
match self.parse_config_list_after_list()? {
QueryExpr::ConfigCommand(command) => {
Ok(FrontendStatement::ConfigCommand(command))
}
other => Err(ParseError::new(
format!(
"internal: LIST CONFIG produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else if matches!(self.peek(), Token::Kv) {
match self.parse_kv_list_after_list()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: LIST KV produced unexpected query kind {other:?}"),
self.position(),
)),
}
} else if matches!(
self.peek(),
Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
) {
match self.parse_vault_list_after_list()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!(
"internal: LIST VAULT produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else {
Err(ParseError::expected(
vec!["CONFIG", "KV", "VAULT"],
self.peek(),
self.position(),
))
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("LIST") => {
self.advance()?;
if matches!(
self.peek(),
Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
) {
match self.parse_config_list_after_list()? {
QueryExpr::ConfigCommand(command) => {
Ok(FrontendStatement::ConfigCommand(command))
}
other => Err(ParseError::new(
format!(
"internal: LIST CONFIG produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else if matches!(self.peek(), Token::Kv) {
match self.parse_kv_list_after_list()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: LIST KV produced unexpected query kind {other:?}"),
self.position(),
)),
}
} else if matches!(
self.peek(),
Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
) {
match self.parse_vault_list_after_list()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!(
"internal: LIST VAULT produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else {
Err(ParseError::expected(
vec!["CONFIG", "KV", "VAULT"],
self.peek(),
self.position(),
))
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("INVALIDATE") => {
if matches!(
self.peek_next()?,
Token::Ident(next) if next.eq_ignore_ascii_case("CONFIG")
) {
match self.parse_config_command()? {
QueryExpr::ConfigCommand(command) => {
Ok(FrontendStatement::ConfigCommand(command))
}
other => Err(ParseError::new(
format!("internal: CONFIG produced unexpected query kind {other:?}"),
self.position(),
)),
}
} else {
self.advance()?;
match self.parse_kv_invalidate_tags_after_invalidate()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!(
"internal: INVALIDATE produced unexpected query kind {other:?}"
),
self.position(),
)),
}
}
}
Token::Attach | Token::Detach => self.parse_sql_statement().map(FrontendStatement::Sql),
Token::Match => match self.parse_match_query()? {
QueryExpr::Graph(query) => Ok(FrontendStatement::Graph(query)),
other => Err(ParseError::new(
format!("internal: MATCH produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Path => match self.parse_path_query()? {
QueryExpr::Path(query) => Ok(FrontendStatement::Path(query)),
other => Err(ParseError::new(
format!("internal: PATH produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Vector => match self.parse_vector_query()? {
QueryExpr::Vector(query) => Ok(FrontendStatement::Vector(query)),
other => Err(ParseError::new(
format!("internal: VECTOR produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Hybrid => match self.parse_hybrid_query()? {
QueryExpr::Hybrid(query) => Ok(FrontendStatement::Hybrid(query)),
other => Err(ParseError::new(
format!("internal: HYBRID produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Graph => match self.parse_graph_command()? {
QueryExpr::GraphCommand(command) => Ok(FrontendStatement::GraphCommand(command)),
other => Err(ParseError::new(
format!("internal: GRAPH produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Search => match self.parse_search_command()? {
QueryExpr::SearchCommand(command) => Ok(FrontendStatement::Search(command)),
other => Err(ParseError::new(
format!("internal: SEARCH produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Ident(name) if name.eq_ignore_ascii_case("ASK") => {
match self.parse_ask_query()? {
QueryExpr::Ask(query) => Ok(FrontendStatement::Ask(query)),
other => Err(ParseError::new(
format!("internal: ASK produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("UNSEAL") => {
match self.parse_unseal_vault_command()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: UNSEAL VAULT produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
Token::Queue => match self.parse_queue_command()? {
QueryExpr::QueueCommand(command) => Ok(FrontendStatement::QueueCommand(command)),
other => Err(ParseError::new(
format!("internal: QUEUE produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
match self.parse_events_command()? {
QueryExpr::Table(query) => Ok(FrontendStatement::Sql(SqlStatement::Query(
SqlQuery::Select(query),
))),
QueryExpr::EventsBackfill(query) => {
Ok(FrontendStatement::EventsBackfill(query))
}
QueryExpr::EventsBackfillStatus { collection } => {
Ok(FrontendStatement::EventsBackfillStatus { collection })
}
other => Err(ParseError::new(
format!("internal: EVENTS produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
Token::Kv => match self.parse_kv_command()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: KV produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Delete => {
if matches!(
self.peek_next()?,
Token::Ident(name) if name.eq_ignore_ascii_case("CONFIG")
) {
match self.parse_config_command()? {
QueryExpr::ConfigCommand(command) => {
Ok(FrontendStatement::ConfigCommand(command))
}
other => Err(ParseError::new(
format!("internal: CONFIG produced unexpected query kind {other:?}"),
self.position(),
)),
}
} else if matches!(
self.peek_next()?,
Token::Ident(name) if name.eq_ignore_ascii_case("VAULT")
) {
match self.parse_vault_lifecycle_command()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: VAULT produced unexpected query kind {other:?}"),
self.position(),
)),
}
} else {
self.parse_sql_statement().map(FrontendStatement::Sql)
}
}
Token::Add => match self.parse_config_command()? {
QueryExpr::ConfigCommand(command) => Ok(FrontendStatement::ConfigCommand(command)),
other => Err(ParseError::new(
format!("internal: CONFIG produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Purge => match self.parse_vault_lifecycle_command()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: VAULT produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Ident(name)
if name.eq_ignore_ascii_case("PUT")
|| name.eq_ignore_ascii_case("GET")
|| name.eq_ignore_ascii_case("RESOLVE")
|| name.eq_ignore_ascii_case("ROTATE")
|| name.eq_ignore_ascii_case("HISTORY")
|| name.eq_ignore_ascii_case("PURGE")
|| name.eq_ignore_ascii_case("INCR")
|| name.eq_ignore_ascii_case("DECR")
|| name.eq_ignore_ascii_case("INVALIDATE") =>
{
if matches!(
self.peek_next()?,
Token::Ident(next) if next.eq_ignore_ascii_case("VAULT")
) {
match self.parse_vault_lifecycle_command()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: VAULT produced unexpected query kind {other:?}"),
self.position(),
)),
}
} else {
match self.parse_config_command()? {
QueryExpr::ConfigCommand(command) => {
Ok(FrontendStatement::ConfigCommand(command))
}
other => Err(ParseError::new(
format!("internal: CONFIG produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("VAULT") => {
match self.parse_vault_command()? {
QueryExpr::KvCommand(command) => Ok(FrontendStatement::KvCommand(command)),
other => Err(ParseError::new(
format!("internal: VAULT produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
Token::Tree => match self.parse_tree_command()? {
QueryExpr::TreeCommand(command) => Ok(FrontendStatement::TreeCommand(command)),
other => Err(ParseError::new(
format!("internal: TREE produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Ident(name) if name.eq_ignore_ascii_case("HLL") => {
match self.parse_hll_command()? {
QueryExpr::ProbabilisticCommand(command) => {
Ok(FrontendStatement::ProbabilisticCommand(command))
}
other => Err(ParseError::new(
format!("internal: HLL produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("SKETCH") => {
match self.parse_sketch_command()? {
QueryExpr::ProbabilisticCommand(command) => {
Ok(FrontendStatement::ProbabilisticCommand(command))
}
other => Err(ParseError::new(
format!("internal: SKETCH produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("FILTER") => {
match self.parse_filter_command()? {
QueryExpr::ProbabilisticCommand(command) => {
Ok(FrontendStatement::ProbabilisticCommand(command))
}
other => Err(ParseError::new(
format!("internal: FILTER produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => self
.parse_sql_command()
.map(SqlCommand::into_statement)
.map(FrontendStatement::Sql),
other => Err(ParseError::expected(
vec![
"SELECT", "MATCH", "PATH", "FROM", "VECTOR", "HYBRID", "INSERT", "UPDATE",
"DELETE", "TRUNCATE", "CREATE", "DROP", "ALTER", "GRAPH", "SEARCH", "ASK",
"QUEUE", "EVENTS", "KV", "HLL", "TREE", "SKETCH", "FILTER", "SET", "SHOW",
"RESET", "DESCRIBE", "DESC", "RANK", "ZRANK", "ZRANGE",
],
other,
self.position(),
)),
}
}
fn parse_ranking_read(&mut self) -> Result<QueryExpr, ParseError> {
let head = self.expect_ident()?;
if head.eq_ignore_ascii_case("RANK") {
return self.parse_rank_after_rank(false);
}
if head.eq_ignore_ascii_case("APPROX") || head.eq_ignore_ascii_case("APPROXIMATE") {
if !self.consume_ident_ci("RANK")? {
return Err(ParseError::expected(
vec!["RANK"],
self.peek(),
self.position(),
));
}
return self.parse_rank_after_rank(true);
}
if head.eq_ignore_ascii_case("ZRANK") {
return self.parse_zrank();
}
if head.eq_ignore_ascii_case("ZRANGE") {
return self.parse_zrange();
}
Err(ParseError::expected(
vec!["RANK", "APPROX RANK", "ZRANK", "ZRANGE"],
self.peek(),
self.position(),
))
}
fn parse_rank_after_rank(&mut self, approximate: bool) -> Result<QueryExpr, ParseError> {
if self.consume(&Token::Of)? {
let entity_id = self.parse_u64_slot("rank entity id")?;
self.expect(Token::In)?;
let ranking = self.expect_ident()?;
let query = RankOfQuery { ranking, entity_id };
return Ok(if approximate {
QueryExpr::ApproxRankOf(query)
} else {
QueryExpr::RankOf(query)
});
}
if !approximate && self.consume(&Token::Range)? {
let lo = self.parse_positive_u64_slot("rank range lower bound")?;
self.expect(Token::To)?;
let hi = self.parse_positive_u64_slot("rank range upper bound")?;
if hi < lo {
return Err(ParseError::value_out_of_range(
"rank range upper bound",
"must be greater than or equal to the lower bound",
self.position(),
));
}
self.expect(Token::In)?;
let ranking = self.expect_ident()?;
return Ok(QueryExpr::RankRange(RankRangeQuery { ranking, lo, hi }));
}
Err(ParseError::expected(
if approximate {
vec!["OF"]
} else {
vec!["OF", "RANGE"]
},
self.peek(),
self.position(),
))
}
fn parse_zrank(&mut self) -> Result<QueryExpr, ParseError> {
let ranking = self.expect_ident()?;
let entity_id = self.parse_u64_slot("ZRANK entity id")?;
Ok(QueryExpr::RankOf(RankOfQuery { ranking, entity_id }))
}
fn parse_zrange(&mut self) -> Result<QueryExpr, ParseError> {
let ranking = self.expect_ident()?;
let start = self.parse_u64_slot("ZRANGE start")?;
let stop = self.parse_u64_slot("ZRANGE stop")?;
if stop < start {
return Err(ParseError::value_out_of_range(
"ZRANGE stop",
"must be greater than or equal to start",
self.position(),
));
}
let _with_scores = self.consume_ident_ci("WITHSCORES")?;
Ok(QueryExpr::RankRange(RankRangeQuery {
ranking,
lo: start + 1,
hi: stop + 1,
}))
}
fn parse_positive_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
let value = self.parse_u64_slot(field)?;
if value == 0 {
return Err(ParseError::value_out_of_range(
field,
"must be a positive integer",
self.position(),
));
}
Ok(value)
}
fn parse_u64_slot(&mut self, field: &'static str) -> Result<u64, ParseError> {
let pos = self.position();
if matches!(self.peek(), Token::Minus | Token::Dash) {
return Err(ParseError::value_out_of_range(
field,
"must be an unsigned integer",
pos,
));
}
let raw = self.parse_integer()?;
u64::try_from(raw)
.map_err(|_| ParseError::value_out_of_range(field, "must be an unsigned integer", pos))
}
pub fn parse_sql_statement(&mut self) -> Result<SqlStatement, ParseError> {
self.parse_sql_command().map(SqlCommand::into_statement)
}
fn parse_dotted_admin_path(&mut self, lowercase: bool) -> Result<String, ParseError> {
let mut path = self.expect_ident()?;
while self.consume(&Token::Dot)? {
let next = self.expect_ident_or_keyword()?;
path = format!("{path}.{next}");
}
Ok(if lowercase {
path.to_ascii_lowercase()
} else {
path
})
}
fn normalize_secret_admin_path(path: String) -> String {
if let Some(rest) = path.strip_prefix("red.secrets.") {
format!("red.secret.{rest}")
} else if path == "red.secrets" {
"red.secret".to_string()
} else {
path
}
}
#[inline(never)]
fn parse_create_command(&mut self) -> Result<SqlCommand, ParseError> {
let pos = self.position();
self.advance()?;
let mut or_replace = false;
if self.consume(&Token::Or)? || self.consume_ident_ci("OR")? {
let _ = self.consume_ident_ci("REPLACE")?;
or_replace = true;
}
let materialized = self.consume(&Token::Materialized)?;
if self.check(&Token::View) {
self.advance()?;
let if_not_exists = self.match_if_not_exists()?;
let name = self.expect_ident()?;
let mut retention_duration_ms: Option<u64> = None;
if self.check(&Token::With) {
self.advance()?;
if !self.consume(&Token::Retention)? && !self.consume_ident_ci("RETENTION")? {
return Err(ParseError::expected(
vec!["RETENTION"],
self.peek(),
self.position(),
));
}
if !materialized {
return Err(ParseError::new(
"WITH RETENTION is only valid on \
CREATE MATERIALIZED VIEW"
.to_string(),
self.position(),
));
}
let value = self.parse_float()?;
let unit_mult = self.parse_duration_unit()?;
retention_duration_ms = Some((value * unit_mult).round() as u64);
}
if !self.consume(&Token::As)? && !self.consume_ident_ci("AS")? {
return Err(ParseError::expected(
vec!["AS"],
self.peek(),
self.position(),
));
}
let body = self.parse_sql_command()?.into_query_expr();
let mut refresh_every_ms: Option<u64> = None;
if self.check(&Token::Refresh) {
if !materialized {
return Err(ParseError::new(
"REFRESH EVERY is only valid on \
CREATE MATERIALIZED VIEW"
.to_string(),
self.position(),
));
}
self.advance()?;
if !self.consume_ident_ci("EVERY")? {
return Err(ParseError::expected(
vec!["EVERY"],
self.peek(),
self.position(),
));
}
let value = self.parse_float()?;
let unit_mult = self.parse_duration_unit()?;
refresh_every_ms = Some((value * unit_mult).round() as u64);
}
return Ok(SqlCommand::CreateView(CreateViewQuery {
name,
query: Box::new(body),
materialized,
if_not_exists,
or_replace,
refresh_every_ms,
retention_duration_ms,
}));
}
if or_replace || materialized {
return Err(ParseError::expected(
vec!["VIEW"],
self.peek(),
self.position(),
));
}
if matches!(self.peek(), Token::Ident(name) if name.eq_ignore_ascii_case("USER")) {
let stmt = self.parse_create_user_statement()?;
Ok(SqlCommand::CreateUser(stmt))
} else if self.check(&Token::Index) || self.check(&Token::Unique) {
match self.parse_create_index_query()? {
QueryExpr::CreateIndex(query) => Ok(SqlCommand::CreateIndex(query)),
other => Err(ParseError::new(
format!("internal: CREATE INDEX produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Table) {
self.expect(Token::Table)?;
match self.parse_create_table_body()? {
QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
other => Err(ParseError::new(
format!("internal: CREATE TABLE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Graph) {
self.advance()?;
match self.parse_create_collection_model_body(CollectionModel::Graph)? {
QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
other => Err(ParseError::new(
format!("internal: CREATE GRAPH produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Document) {
self.advance()?;
match self.parse_create_collection_model_body(CollectionModel::Document)? {
QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
other => Err(ParseError::new(
format!("internal: CREATE DOCUMENT produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Vector) {
self.advance()?;
match self.parse_create_vector_body()? {
QueryExpr::CreateVector(query) => Ok(SqlCommand::CreateVector(query)),
other => Err(ParseError::new(
format!("internal: CREATE VECTOR produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Collection) {
self.advance()?;
match self.parse_create_collection_body()? {
QueryExpr::CreateCollection(query) => Ok(SqlCommand::CreateCollection(query)),
other => Err(ParseError::new(
format!("internal: CREATE COLLECTION produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Kv) {
self.advance()?;
match self.parse_create_keyed_body(CollectionModel::Kv)? {
QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
other => Err(ParseError::new(
format!("internal: CREATE KV produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.consume_ident_ci("CONFIG")? {
match self.parse_create_keyed_body(CollectionModel::Config)? {
QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
other => Err(ParseError::new(
format!("internal: CREATE CONFIG produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.consume_ident_ci("VAULT")? {
match self.parse_create_keyed_body(CollectionModel::Vault)? {
QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
other => Err(ParseError::new(
format!("internal: CREATE VAULT produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Timeseries) {
self.advance()?;
match self.parse_create_timeseries_body()? {
QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
other => Err(ParseError::new(
format!("internal: CREATE TIMESERIES produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Metric) {
self.advance()?;
match self.parse_create_metric_body()? {
QueryExpr::CreateMetric(query) => Ok(SqlCommand::CreateMetric(query)),
other => Err(ParseError::new(
format!("internal: CREATE METRIC produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.consume_ident_ci("METRICS")? {
match self.parse_create_metrics_body()? {
QueryExpr::CreateTable(query) => Ok(SqlCommand::CreateTable(query)),
other => Err(ParseError::new(
format!("internal: CREATE METRICS produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.consume_ident_ci("SLO")? {
match self.parse_create_slo_body()? {
QueryExpr::CreateSlo(query) => Ok(SqlCommand::CreateSlo(query)),
other => Err(ParseError::new(
format!("internal: CREATE SLO produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE")) {
self.advance()?;
match self.parse_create_hypertable_body()? {
QueryExpr::CreateTimeSeries(query) => Ok(SqlCommand::CreateTimeSeries(query)),
other => Err(ParseError::new(
format!("internal: CREATE HYPERTABLE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Queue) {
self.advance()?;
match self.parse_create_queue_body()? {
QueryExpr::CreateQueue(query) => Ok(SqlCommand::CreateQueue(query)),
other => Err(ParseError::new(
format!("internal: CREATE QUEUE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Tree) {
self.advance()?;
match self.parse_create_tree_body()? {
QueryExpr::CreateTree(query) => Ok(SqlCommand::CreateTree(query)),
other => Err(ParseError::new(
format!("internal: CREATE TREE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if matches!(self.peek(), Token::Ident(n) if
n.eq_ignore_ascii_case("HLL") ||
n.eq_ignore_ascii_case("SKETCH") ||
n.eq_ignore_ascii_case("FILTER"))
{
match self.parse_create_probabilistic()? {
QueryExpr::ProbabilisticCommand(command) => Ok(SqlCommand::Probabilistic(command)),
other => Err(ParseError::new(
format!("internal: CREATE probabilistic produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Schema) {
self.advance()?;
let if_not_exists = self.match_if_not_exists()?;
let name = self.expect_ident()?;
Ok(SqlCommand::CreateSchema(CreateSchemaQuery {
name,
if_not_exists,
}))
} else if self.check(&Token::Policy) {
self.advance()?;
if matches!(self.peek(), Token::String(_)) {
let expr = self.parse_create_iam_policy_after_keywords()?;
return Ok(SqlCommand::IamPolicy(expr));
}
let name = self.expect_ident()?;
self.expect(Token::On)?;
let (target_kind, table) = {
use crate::ast::PolicyTargetKind;
let kw = match self.peek() {
Token::Ident(s) => Some(s.to_ascii_uppercase()),
_ => None,
};
let kind = kw.as_deref().and_then(|k| match k {
"NODES" => Some(PolicyTargetKind::Nodes),
"EDGES" => Some(PolicyTargetKind::Edges),
"VECTORS" => Some(PolicyTargetKind::Vectors),
"MESSAGES" => Some(PolicyTargetKind::Messages),
"POINTS" => Some(PolicyTargetKind::Points),
"DOCUMENTS" => Some(PolicyTargetKind::Documents),
_ => None,
});
if let Some(k) = kind {
self.advance()?;
self.expect(Token::Of)?;
let coll = self.expect_ident()?;
(k, coll)
} else {
let coll = self.expect_ident()?;
(PolicyTargetKind::Table, coll)
}
};
let action = if self.consume(&Token::For)? {
let a = match self.peek() {
Token::Select => {
self.advance()?;
Some(PolicyAction::Select)
}
Token::Insert => {
self.advance()?;
Some(PolicyAction::Insert)
}
Token::Update => {
self.advance()?;
Some(PolicyAction::Update)
}
Token::Delete => {
self.advance()?;
Some(PolicyAction::Delete)
}
Token::All => {
self.advance()?;
None
}
_ => None,
};
a
} else {
None
};
let role = if self.consume(&Token::To)? {
Some(self.expect_ident()?)
} else {
None
};
self.expect(Token::Using)?;
self.expect(Token::LParen)?;
let filter = self.parse_filter()?;
self.expect(Token::RParen)?;
Ok(SqlCommand::CreatePolicy(CreatePolicyQuery {
name,
table,
action,
role,
using: Box::new(filter),
target_kind,
}))
} else if self.check(&Token::Server) {
self.advance()?;
let if_not_exists = self.match_if_not_exists()?;
let name = self.expect_ident()?;
self.expect(Token::Foreign)?;
self.expect(Token::Data)?;
self.expect(Token::Wrapper)?;
let wrapper = self.expect_ident()?;
let options = self.parse_fdw_options_clause()?;
Ok(SqlCommand::CreateServer(CreateServerQuery {
name,
wrapper,
options,
if_not_exists,
}))
} else if self.check(&Token::Foreign) {
self.advance()?;
self.expect(Token::Table)?;
let if_not_exists = self.match_if_not_exists()?;
let name = self.expect_ident()?;
self.expect(Token::LParen)?;
let mut columns = Vec::new();
loop {
let col_name = self.expect_ident()?;
let data_type = self.expect_ident_or_keyword()?;
let mut not_null = false;
if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NOT")) {
self.advance()?;
if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("NULL")) {
self.advance()?;
not_null = true;
}
}
columns.push(ForeignColumnDef {
name: col_name,
data_type,
not_null,
});
if !self.consume(&Token::Comma)? {
break;
}
}
self.expect(Token::RParen)?;
self.expect(Token::Server)?;
let server = self.expect_ident()?;
let options = self.parse_fdw_options_clause()?;
Ok(SqlCommand::CreateForeignTable(CreateForeignTableQuery {
name,
server,
columns,
options,
if_not_exists,
}))
} else if self.check(&Token::Sequence) {
self.advance()?;
let if_not_exists = self.match_if_not_exists()?;
let name = self.expect_ident()?;
let mut start: i64 = 1;
let mut increment: i64 = 1;
loop {
if self.consume(&Token::Start)? {
let _ = self.consume(&Token::With)? || self.consume_ident_ci("WITH")?;
start = self.parse_integer()?;
} else if self.consume(&Token::Increment)? {
let _ = self.consume(&Token::By)? || self.consume_ident_ci("BY")?;
increment = self.parse_integer()?;
} else {
break;
}
}
Ok(SqlCommand::CreateSequence(CreateSequenceQuery {
name,
if_not_exists,
start,
increment,
}))
} else if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
self.advance()?; match self.parse_create_migration_body()? {
QueryExpr::CreateMigration(q) => Ok(SqlCommand::CreateMigration(q)),
other => Err(ParseError::new(
format!("internal: CREATE MIGRATION produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if let Some(reason) = analytics_v0_non_goal_create(self.peek()) {
Err(ParseError::new(reason, self.position()))
} else if let Some(err) =
ParseError::unsupported_recognized_token(self.peek(), self.position())
{
Err(err)
} else {
Err(ParseError::expected(
vec![
"TABLE",
"GRAPH",
"VECTOR",
"DOCUMENT",
"KV",
"COLLECTION",
"INDEX",
"UNIQUE",
"METRIC",
"TIMESERIES",
"QUEUE",
"TREE",
"HLL",
"SKETCH",
"FILTER",
"SCHEMA",
"SEQUENCE",
"USER",
"MIGRATION",
],
self.peek(),
pos,
))
}
}
pub fn parse_sql_command(&mut self) -> Result<SqlCommand, ParseError> {
match self.peek() {
Token::Select => match self.parse_select_query()? {
QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
other => Err(ParseError::new(
format!("internal: SELECT produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::From => match self.parse_from_query()? {
QueryExpr::Table(query) => Ok(SqlCommand::Select(query)),
QueryExpr::Join(query) => Ok(SqlCommand::Join(query)),
other => Err(ParseError::new(
format!("internal: FROM produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Insert => match self.parse_insert_query()? {
QueryExpr::Insert(query) => Ok(SqlCommand::Insert(query)),
other => Err(ParseError::new(
format!("internal: INSERT produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Update => match self.parse_update_query()? {
QueryExpr::Update(query) => Ok(SqlCommand::Update(query)),
other => Err(ParseError::new(
format!("internal: UPDATE produced unexpected query kind {other:?}"),
self.position(),
)),
},
Token::Delete => {
if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("SECRET"))
{
self.advance()?; self.advance()?; let key =
Self::normalize_secret_admin_path(self.parse_dotted_admin_path(true)?);
Ok(SqlCommand::DeleteSecret { key })
} else {
match self.parse_delete_query()? {
QueryExpr::Delete(query) => Ok(SqlCommand::Delete(query)),
other => Err(ParseError::new(
format!("internal: DELETE produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
}
Token::Truncate => {
self.advance()?;
let model = if self.consume(&Token::Table)? {
Some(CollectionModel::Table)
} else if self.consume(&Token::Graph)? {
Some(CollectionModel::Graph)
} else if self.consume(&Token::Vector)? {
Some(CollectionModel::Vector)
} else if self.consume(&Token::Document)? {
Some(CollectionModel::Document)
} else if self.consume(&Token::Timeseries)? {
Some(CollectionModel::TimeSeries)
} else if self.consume_ident_ci("METRICS")? {
Some(CollectionModel::Metrics)
} else if self.consume(&Token::Kv)? {
Some(CollectionModel::Kv)
} else if self.consume(&Token::Queue)? {
Some(CollectionModel::Queue)
} else if self.consume(&Token::Collection)? {
None
} else {
return Err(ParseError::expected(
vec![
"TABLE",
"GRAPH",
"VECTOR",
"DOCUMENT",
"TIMESERIES",
"METRICS",
"KV",
"QUEUE",
"COLLECTION",
],
self.peek(),
self.position(),
));
};
match self.parse_truncate_body(model)? {
QueryExpr::Truncate(query) => Ok(SqlCommand::Truncate(query)),
other => Err(ParseError::new(
format!("internal: TRUNCATE produced unexpected kind {other:?}"),
self.position(),
)),
}
}
Token::Explain => {
if matches!(self.peek_next()?, Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION"))
{
self.advance()?; match self.parse_explain_migration_after_keyword()? {
QueryExpr::ExplainMigration(q) => Ok(SqlCommand::ExplainMigration(q)),
other => Err(ParseError::new(
format!(
"internal: EXPLAIN MIGRATION produced unexpected kind {other:?}"
),
self.position(),
)),
}
} else {
match self.parse_explain_alter_query()? {
QueryExpr::ExplainAlter(query) => Ok(SqlCommand::ExplainAlter(query)),
other => Err(ParseError::new(
format!("internal: EXPLAIN produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
}
Token::Create => self.parse_create_command(),
Token::Drop => {
let pos = self.position();
self.advance()?;
let materialized = self.consume(&Token::Materialized)?;
if self.check(&Token::View) {
self.advance()?;
let if_exists = self.match_if_exists()?;
let name = self.expect_ident()?;
return Ok(SqlCommand::DropView(DropViewQuery {
name,
materialized,
if_exists,
}));
}
if materialized {
return Err(ParseError::expected(
vec!["VIEW"],
self.peek(),
self.position(),
));
}
if self.check(&Token::Index) {
match self.parse_drop_index_query()? {
QueryExpr::DropIndex(query) => Ok(SqlCommand::DropIndex(query)),
other => Err(ParseError::new(
format!("internal: DROP INDEX produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Table) {
self.expect(Token::Table)?;
match self.parse_drop_table_body()? {
QueryExpr::DropTable(query) => Ok(SqlCommand::DropTable(query)),
other => Err(ParseError::new(
format!("internal: DROP TABLE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Graph) {
self.advance()?;
match self.parse_drop_graph_body()? {
QueryExpr::DropGraph(query) => Ok(SqlCommand::DropGraph(query)),
other => Err(ParseError::new(
format!("internal: DROP GRAPH produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Vector) {
self.advance()?;
match self.parse_drop_vector_body()? {
QueryExpr::DropVector(query) => Ok(SqlCommand::DropVector(query)),
other => Err(ParseError::new(
format!("internal: DROP VECTOR produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Document) {
self.advance()?;
match self.parse_drop_document_body()? {
QueryExpr::DropDocument(query) => Ok(SqlCommand::DropDocument(query)),
other => Err(ParseError::new(
format!("internal: DROP DOCUMENT produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Kv) {
self.advance()?;
match self.parse_drop_kv_body()? {
QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
other => Err(ParseError::new(
format!("internal: DROP KV produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.consume_ident_ci("CONFIG")? {
match self.parse_drop_keyed_body(CollectionModel::Config)? {
QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
other => Err(ParseError::new(
format!("internal: DROP CONFIG produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.consume_ident_ci("VAULT")? {
match self.parse_drop_keyed_body(CollectionModel::Vault)? {
QueryExpr::DropKv(query) => Ok(SqlCommand::DropKv(query)),
other => Err(ParseError::new(
format!("internal: DROP VAULT produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Collection) {
self.advance()?;
match self.parse_drop_collection_body()? {
QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
other => Err(ParseError::new(
format!("internal: DROP COLLECTION produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Timeseries) {
self.advance()?;
match self.parse_drop_timeseries_body()? {
QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
other => Err(ParseError::new(
format!("internal: DROP TIMESERIES produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.consume_ident_ci("METRICS")? {
match self.parse_drop_collection_model_body(Some(CollectionModel::Metrics))? {
QueryExpr::DropCollection(query) => Ok(SqlCommand::DropCollection(query)),
other => Err(ParseError::new(
format!("internal: DROP METRICS produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if matches!(self.peek(), Token::Ident(s) if s.eq_ignore_ascii_case("HYPERTABLE"))
{
self.advance()?;
match self.parse_drop_timeseries_body()? {
QueryExpr::DropTimeSeries(query) => Ok(SqlCommand::DropTimeSeries(query)),
other => Err(ParseError::new(
format!("internal: DROP HYPERTABLE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Queue) {
self.advance()?;
match self.parse_drop_queue_body()? {
QueryExpr::DropQueue(query) => Ok(SqlCommand::DropQueue(query)),
other => Err(ParseError::new(
format!("internal: DROP QUEUE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if self.check(&Token::Tree) {
self.advance()?;
match self.parse_drop_tree_body()? {
QueryExpr::DropTree(query) => Ok(SqlCommand::DropTree(query)),
other => Err(ParseError::new(
format!("internal: DROP TREE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if matches!(self.peek(), Token::Ident(n) if
n.eq_ignore_ascii_case("HLL") ||
n.eq_ignore_ascii_case("SKETCH") ||
n.eq_ignore_ascii_case("FILTER"))
{
match self.parse_drop_probabilistic()? {
QueryExpr::ProbabilisticCommand(command) => {
Ok(SqlCommand::Probabilistic(command))
}
other => Err(ParseError::new(
format!(
"internal: DROP probabilistic produced unexpected kind {other:?}"
),
self.position(),
)),
}
} else if self.check(&Token::Schema) {
self.advance()?;
let if_exists = self.match_if_exists()?;
let name = self.expect_ident()?;
let cascade = self.consume(&Token::Cascade)?;
Ok(SqlCommand::DropSchema(DropSchemaQuery {
name,
if_exists,
cascade,
}))
} else if self.check(&Token::Policy) {
self.advance()?;
if matches!(self.peek(), Token::String(_)) {
let expr = self.parse_drop_iam_policy_after_keywords()?;
return Ok(SqlCommand::IamPolicy(expr));
}
let if_exists = self.match_if_exists()?;
let name = self.expect_ident()?;
self.expect(Token::On)?;
let table = self.expect_ident()?;
Ok(SqlCommand::DropPolicy(DropPolicyQuery {
name,
table,
if_exists,
}))
} else if self.check(&Token::Server) {
self.advance()?;
let if_exists = self.match_if_exists()?;
let name = self.expect_ident()?;
let cascade = self.consume(&Token::Cascade)?;
Ok(SqlCommand::DropServer(DropServerQuery {
name,
if_exists,
cascade,
}))
} else if self.check(&Token::Foreign) {
self.advance()?;
self.expect(Token::Table)?;
let if_exists = self.match_if_exists()?;
let name = self.expect_ident()?;
Ok(SqlCommand::DropForeignTable(DropForeignTableQuery {
name,
if_exists,
}))
} else if self.check(&Token::Sequence) {
self.advance()?;
let if_exists = self.match_if_exists()?;
let name = self.expect_ident()?;
Ok(SqlCommand::DropSequence(DropSequenceQuery {
name,
if_exists,
}))
} else if let Some(err) =
ParseError::unsupported_recognized_token(self.peek(), self.position())
{
Err(err)
} else {
Err(ParseError::expected(
vec![
"TABLE",
"INDEX",
"TIMESERIES",
"QUEUE",
"TREE",
"HLL",
"SKETCH",
"FILTER",
"SCHEMA",
"SEQUENCE",
],
self.peek(),
pos,
))
}
}
Token::Alter => {
let next = self.peek_next()?.clone();
if matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("USER")) {
self.advance()?; let stmt = self.parse_alter_user_statement()?;
Ok(SqlCommand::AlterUser(stmt))
} else if matches!(next, Token::Queue) {
self.advance()?; self.advance()?; match self.parse_alter_queue_body()? {
QueryExpr::AlterQueue(query) => Ok(SqlCommand::AlterQueue(query)),
other => Err(ParseError::new(
format!("internal: ALTER QUEUE produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if matches!(next, Token::Metric) {
self.advance()?; self.advance()?; match self.parse_alter_metric_body()? {
QueryExpr::AlterMetric(query) => Ok(SqlCommand::AlterMetric(query)),
other => Err(ParseError::new(
format!("internal: ALTER METRIC produced unexpected kind {other:?}"),
self.position(),
)),
}
} else if matches!(next, Token::Graph) {
match self.parse_alter_graph_query()? {
QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
other => Err(ParseError::new(
format!(
"internal: ALTER GRAPH produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else if matches!(next, Token::Table)
|| matches!(next, Token::Collection)
|| matches!(next, Token::Ident(ref s) if s.eq_ignore_ascii_case("COLLECTION"))
{
match self.parse_alter_table_query()? {
QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
other => Err(ParseError::new(
format!(
"internal: ALTER TABLE produced unexpected query kind {other:?}"
),
self.position(),
)),
}
} else if let Some(err) =
ParseError::unsupported_recognized_token(&next, self.position())
{
Err(err)
} else {
match self.parse_alter_table_query()? {
QueryExpr::AlterTable(query) => Ok(SqlCommand::AlterTable(query)),
other => Err(ParseError::new(
format!("internal: ALTER produced unexpected query kind {other:?}"),
self.position(),
)),
}
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("GRANT") => {
let stmt = self.parse_grant_statement()?;
Ok(SqlCommand::Grant(stmt))
}
Token::Ident(name) if name.eq_ignore_ascii_case("REVOKE") => {
let stmt = self.parse_revoke_statement()?;
Ok(SqlCommand::Revoke(stmt))
}
Token::Ident(name) if name.eq_ignore_ascii_case("EVENTS") => {
self.advance()?;
if self.consume_ident_ci("BACKFILL")? {
return Err(ParseError::new(
"EVENTS BACKFILL STATUS is not implemented; EVENTS BACKFILL runtime is available but durable progress tracking is not"
.to_string(),
self.position(),
));
}
if !self.consume_ident_ci("STATUS")? {
return Err(ParseError::expected(
vec!["STATUS"],
self.peek(),
self.position(),
));
}
let mut query = TableQuery::new("red.subscriptions");
let collection = match self.peek().clone() {
Token::Ident(name) => {
self.advance()?;
Some(name)
}
Token::String(name) => {
self.advance()?;
Some(name)
}
_ => None,
};
self.parse_table_clauses(&mut query)?;
if let Some(collection) = collection {
let filter = Filter::compare(
FieldRef::column("red.subscriptions", "collection"),
CompareOp::Eq,
Value::text(collection),
);
let expr = filter_to_expr(&filter);
query.where_expr = Some(match query.where_expr.take() {
Some(existing) => Expr::binop(BinOp::And, existing, expr),
None => expr,
});
query.filter = Some(match query.filter.take() {
Some(existing) => existing.and(filter),
None => filter,
});
}
Ok(SqlCommand::Select(query))
}
Token::Attach => {
let expr = self.parse_attach_policy()?;
Ok(SqlCommand::IamPolicy(expr))
}
Token::Detach => {
let expr = self.parse_detach_policy()?;
Ok(SqlCommand::IamPolicy(expr))
}
Token::Ident(name) if name.eq_ignore_ascii_case("SIMULATE") => {
let expr = self.parse_simulate_policy()?;
Ok(SqlCommand::IamPolicy(expr))
}
Token::Ident(name) if name.eq_ignore_ascii_case("LINT") => {
let expr = self.parse_lint_policy()?;
Ok(SqlCommand::IamPolicy(expr))
}
Token::Ident(name) if name.eq_ignore_ascii_case("MIGRATE") => {
let next = self.peek_next()?.clone();
let is_policy_mode = matches!(&next, Token::Policy)
|| matches!(&next, Token::Ident(name)
if name.eq_ignore_ascii_case("POLICY"));
if is_policy_mode {
let expr = self.parse_migrate_policy_mode()?;
return Ok(SqlCommand::IamPolicy(expr));
}
Err(ParseError::expected(
vec!["POLICY"],
self.peek(),
self.position(),
))
}
Token::Set => {
self.advance()?;
if self.consume_ident_ci("CONFIG")? {
let full_key = self.parse_dotted_admin_path(true)?;
self.expect(Token::Eq)?;
let value = self.parse_literal_value()?;
Ok(SqlCommand::SetConfig {
key: full_key,
value,
})
} else if self.consume_ident_ci("SECRET")? {
let key =
Self::normalize_secret_admin_path(self.parse_dotted_admin_path(true)?);
self.expect(Token::Eq)?;
let value = self.parse_literal_value()?;
Ok(SqlCommand::SetSecret { key, value })
} else if self.consume_ident_ci("TENANT")? {
let _ = self.consume(&Token::Eq)?;
if self.consume_ident_ci("NULL")? {
Ok(SqlCommand::SetTenant(None))
} else {
let value = self.parse_literal_value()?;
match value {
Value::Text(s) => Ok(SqlCommand::SetTenant(Some(s.to_string()))),
Value::Null => Ok(SqlCommand::SetTenant(None)),
other => Err(ParseError::new(
format!("SET TENANT expects a text literal or NULL, got {other:?}"),
self.position(),
)),
}
}
} else {
Err(ParseError::expected(
vec!["CONFIG", "SECRET", "TENANT"],
self.peek(),
self.position(),
))
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("APPLY") => {
self.advance()?;
match self.parse_apply_migration()? {
QueryExpr::ApplyMigration(q) => Ok(SqlCommand::ApplyMigration(q)),
other => Err(ParseError::new(
format!("internal: APPLY MIGRATION produced unexpected kind {other:?}"),
self.position(),
)),
}
}
Token::Ident(name) if name.eq_ignore_ascii_case("RESET") => {
self.advance()?;
if self.consume_ident_ci("TENANT")? {
Ok(SqlCommand::SetTenant(None))
} else {
Err(ParseError::expected(
vec!["TENANT"],
self.peek(),
self.position(),
))
}
}
Token::Ident(name)
if name.eq_ignore_ascii_case("DESCRIBE") || name.eq_ignore_ascii_case("DESC") =>
{
self.advance()?;
let collection = self.parse_dotted_admin_path(false)?;
let mut query = TableQuery::new("red.describe");
query.filter = Some(Filter::compare(
FieldRef::column("", "collection"),
CompareOp::Eq,
Value::text(collection),
));
Ok(SqlCommand::Select(query))
}
Token::Desc => {
self.advance()?;
let collection = self.parse_dotted_admin_path(false)?;
let mut query = TableQuery::new("red.describe");
query.filter = Some(Filter::compare(
FieldRef::column("", "collection"),
CompareOp::Eq,
Value::text(collection),
));
Ok(SqlCommand::Select(query))
}
Token::Ident(name) if name.eq_ignore_ascii_case("SHOW") => {
self.advance()?;
if self.consume(&Token::Create)? || self.consume_ident_ci("CREATE")? {
if !(self.consume(&Token::Table)? || self.consume_ident_ci("TABLE")?) {
return Err(ParseError::expected(
vec!["TABLE"],
self.peek(),
self.position(),
));
}
let collection = self.parse_dotted_admin_path(false)?;
let mut query = TableQuery::new("red.show_create");
query.filter = Some(Filter::compare(
FieldRef::column("", "collection"),
CompareOp::Eq,
Value::text(collection),
));
Ok(SqlCommand::Select(query))
} else if self.consume_ident_ci("CONFIG")? {
let prefix = if !(self.check(&Token::Eof)
|| self.check(&Token::As)
|| self.check(&Token::Format))
{
let first = self.expect_ident()?;
let mut full = first;
while self.consume(&Token::Dot)? {
let next = self.expect_ident_or_keyword()?;
full = format!("{full}.{next}");
}
Some(full.to_ascii_lowercase())
} else {
None
};
let as_json = if self.consume(&Token::As)? || self.consume(&Token::Format)? {
if !self.consume(&Token::Json)? {
return Err(ParseError::expected(
vec!["JSON"],
self.peek(),
self.position(),
));
}
true
} else {
false
};
Ok(SqlCommand::ShowConfig { prefix, as_json })
} else if self.consume_ident_ci("COLLECTIONS")? {
let mut query = TableQuery::new("red.collections");
let include_internal = if self.consume_ident_ci("INCLUDING")? {
if !self.consume_ident_ci("INTERNAL")? {
return Err(ParseError::expected(
vec!["INTERNAL"],
self.peek(),
self.position(),
));
}
true
} else {
false
};
self.parse_table_clauses(&mut query)?;
if !include_internal {
let user_filter = query.filter.take();
let hide_internal = crate::ast::Filter::Compare {
field: FieldRef::column("", "internal"),
op: CompareOp::Eq,
value: Value::Boolean(false),
};
query.filter = Some(match user_filter {
Some(filter) => filter.and(hide_internal),
None => hide_internal,
});
}
Ok(SqlCommand::Select(query))
} else if self.consume_ident_ci("TABLES")? {
Ok(SqlCommand::Select(parse_show_collections_by_model(
self, "table",
)?))
} else if self.consume_ident_ci("QUEUES")? {
let mut query = TableQuery::new("red.queues");
let include_internal = if self.consume_ident_ci("INCLUDING")? {
if !self.consume_ident_ci("INTERNAL")? {
return Err(ParseError::expected(
vec!["INTERNAL"],
self.peek(),
self.position(),
));
}
true
} else {
false
};
self.parse_table_clauses(&mut query)?;
if !include_internal {
let hide_internal = Filter::Compare {
field: FieldRef::column("", "internal"),
op: CompareOp::Eq,
value: Value::Boolean(false),
};
add_table_filter(&mut query, hide_internal);
}
Ok(SqlCommand::Select(query))
} else if self.consume(&Token::Vectors)? || self.consume_ident_ci("VECTORS")? {
Ok(SqlCommand::Select(parse_show_collections_by_model(
self, "vector",
)?))
} else if self.consume_ident_ci("DOCUMENTS")? {
Ok(SqlCommand::Select(parse_show_collections_by_model(
self, "document",
)?))
} else if self.consume(&Token::Timeseries)?
|| self.consume_ident_ci("TIMESERIES")?
{
Ok(SqlCommand::Select(parse_show_collections_by_model(
self,
"timeseries",
)?))
} else if self.consume_ident_ci("GRAPHS")? {
Ok(SqlCommand::Select(parse_show_collections_by_model(
self, "graph",
)?))
} else if self.consume_ident_ci("CONFIGS")? {
Ok(SqlCommand::Select(parse_show_collections_by_model(
self, "config",
)?))
} else if self.consume_ident_ci("VAULTS")? {
Ok(SqlCommand::Select(parse_show_collections_by_model(
self, "vault",
)?))
} else if self.consume(&Token::Kv)?
|| self.consume_ident_ci("KV")?
|| self.consume_ident_ci("KVS")?
{
Ok(SqlCommand::Select(parse_show_collections_by_model(
self, "kv",
)?))
} else if self.consume(&Token::Schema)? || self.consume_ident_ci("SCHEMA")? {
let collection = self.parse_dotted_admin_path(false)?;
let mut query = TableQuery::new("red.columns");
query.filter = Some(Filter::compare(
FieldRef::column("", "collection"),
CompareOp::Eq,
Value::text(collection),
));
Ok(SqlCommand::Select(query))
} else if self.consume_ident_ci("INDICES")? || self.consume_ident_ci("INDEXES")? {
let mut query = TableQuery::new("red.show_indexes");
if self.consume(&Token::On)? {
let collection = self.expect_ident_or_keyword()?;
let filter = Filter::Compare {
field: FieldRef::column("", "table"),
op: CompareOp::Eq,
value: Value::text(collection),
};
query.where_expr = Some(filter_to_expr(&filter));
query.filter = Some(filter);
}
self.parse_table_clauses(&mut query)?;
Ok(SqlCommand::Select(query))
} else if self.consume_ident_ci("POLICIES")? {
if self.consume(&Token::For)? || self.consume_ident_ci("FOR")? {
let principal = self.parse_iam_principal_kind()?;
return Ok(SqlCommand::IamPolicy(QueryExpr::ShowPolicies {
filter: Some(principal),
}));
}
let mut query = TableQuery::new("red.policies");
let collection_filter =
if self.consume(&Token::On)? || self.consume_ident_ci("ON")? {
let collection = self.parse_dotted_admin_path(false)?;
Some(Filter::Compare {
field: FieldRef::TableColumn {
table: String::new(),
column: "collection".to_string(),
},
op: CompareOp::Eq,
value: Value::text(collection),
})
} else {
None
};
self.parse_table_clauses(&mut query)?;
if let Some(collection_filter) = collection_filter {
let combined = match query.filter.take() {
Some(existing) => {
Filter::And(Box::new(collection_filter), Box::new(existing))
}
None => collection_filter,
};
query.where_expr = Some(filter_to_expr(&combined));
query.filter = Some(combined);
}
Ok(SqlCommand::Select(query))
} else if self.consume_ident_ci("STATS")? {
let mut query = TableQuery::new("red.stats");
let collection = match self.peek().clone() {
Token::Ident(name) => {
self.advance()?;
Some(name)
}
Token::String(name) => {
self.advance()?;
Some(name)
}
_ => None,
};
self.parse_table_clauses(&mut query)?;
if let Some(collection) = collection {
let filter = Filter::compare(
FieldRef::column("red.stats", "collection"),
CompareOp::Eq,
Value::text(collection),
);
let expr = filter_to_expr(&filter);
query.where_expr = Some(match query.where_expr.take() {
Some(existing) => Expr::binop(BinOp::And, existing, expr),
None => expr,
});
query.filter = Some(match query.filter.take() {
Some(existing) => existing.and(filter),
None => filter,
});
}
Ok(SqlCommand::Select(query))
} else if self.consume_ident_ci("SAMPLE")? {
let mut query = TableQuery::new(&self.expect_ident()?);
query.limit = if self.consume(&Token::Limit)? {
Some(self.parse_integer()? as u64)
} else {
Some(10)
};
Ok(SqlCommand::Select(query))
} else if self.consume_ident_ci("SECRET")? || self.consume_ident_ci("SECRETS")? {
let prefix = if !self.check(&Token::Eof) {
Some(Self::normalize_secret_admin_path(
self.parse_dotted_admin_path(true)?,
))
} else {
None
};
Ok(SqlCommand::ShowSecrets { prefix })
} else if self.consume_ident_ci("TENANT")? {
Ok(SqlCommand::ShowTenant)
} else if let Some(expr) = self.parse_show_iam_after_show()? {
Ok(SqlCommand::IamPolicy(expr))
} else {
Err(ParseError::expected(
vec![
"CONFIG",
"SECRET",
"SECRETS",
"COLLECTIONS",
"TABLES",
"QUEUES",
"VECTORS",
"DOCUMENTS",
"TIMESERIES",
"GRAPHS",
"KV",
"SCHEMA",
"INDICES",
"INDEXES",
"SAMPLE",
"POLICIES",
"STATS",
"TENANT",
"EFFECTIVE",
],
self.peek(),
self.position(),
))
}
}
Token::Begin | Token::Start => {
self.advance()?;
let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
if self.consume_ident_ci("ISOLATION")? {
self.expect(Token::Level)?;
let mut parts: Vec<String> = Vec::new();
if self.consume_ident_ci("READ")? {
parts.push("READ".to_string());
if self.consume_ident_ci("UNCOMMITTED")? {
parts.push("UNCOMMITTED".to_string());
} else if self.consume_ident_ci("COMMITTED")? {
parts.push("COMMITTED".to_string());
} else {
return Err(ParseError::expected(
vec!["UNCOMMITTED", "COMMITTED"],
self.peek(),
self.position(),
));
}
} else if self.consume_ident_ci("REPEATABLE")? {
parts.push("REPEATABLE".to_string());
if !self.consume_ident_ci("READ")? {
return Err(ParseError::expected(
vec!["READ"],
self.peek(),
self.position(),
));
}
parts.push("READ".to_string());
} else if self.consume_ident_ci("SNAPSHOT")? {
parts.push("SNAPSHOT".to_string());
} else if self.consume_ident_ci("SERIALIZABLE")? {
return Err(ParseError::new(
"ISOLATION LEVEL SERIALIZABLE is not yet supported — reddb \
currently provides SNAPSHOT ISOLATION (which PG calls \
REPEATABLE READ). Use REPEATABLE READ / SNAPSHOT / \
READ COMMITTED, or omit ISOLATION LEVEL for the default."
.to_string(),
self.position(),
));
} else {
return Err(ParseError::expected(
vec!["READ", "REPEATABLE", "SNAPSHOT", "SERIALIZABLE"],
self.peek(),
self.position(),
));
}
let _ = parts;
}
Ok(SqlCommand::TransactionControl(TxnControl::Begin))
}
Token::Commit => {
self.advance()?;
let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
Ok(SqlCommand::TransactionControl(TxnControl::Commit))
}
Token::Rollback => {
self.advance()?;
if matches!(self.peek(), Token::Ident(n) if n.eq_ignore_ascii_case("MIGRATION")) {
match self.parse_rollback_migration_after_keyword()? {
QueryExpr::RollbackMigration(q) => Ok(SqlCommand::RollbackMigration(q)),
other => Err(ParseError::new(
format!(
"internal: ROLLBACK MIGRATION produced unexpected kind {other:?}"
),
self.position(),
)),
}
} else {
let _ = self.consume(&Token::Work)? || self.consume(&Token::Transaction)?;
if self.consume(&Token::To)? {
let _ = self.consume(&Token::Savepoint)?;
let name = self.expect_ident()?;
Ok(SqlCommand::TransactionControl(
TxnControl::RollbackToSavepoint(name),
))
} else {
Ok(SqlCommand::TransactionControl(TxnControl::Rollback))
}
}
}
Token::Savepoint => {
self.advance()?;
let name = self.expect_ident()?;
Ok(SqlCommand::TransactionControl(TxnControl::Savepoint(name)))
}
Token::Release => {
self.advance()?;
let _ = self.consume(&Token::Savepoint)?;
let name = self.expect_ident()?;
Ok(SqlCommand::TransactionControl(
TxnControl::ReleaseSavepoint(name),
))
}
Token::Vacuum => {
self.advance()?;
let full = self.consume(&Token::Full)?;
let target = if self.check(&Token::Eof) {
None
} else {
Some(self.expect_ident()?)
};
Ok(SqlCommand::Maintenance(MaintenanceCommand::Vacuum {
target,
full,
}))
}
Token::Refresh => {
self.advance()?;
self.expect(Token::Materialized)?;
self.expect(Token::View)?;
let name = self.expect_ident()?;
Ok(SqlCommand::RefreshMaterializedView(
RefreshMaterializedViewQuery { name },
))
}
Token::Analyze => {
self.advance()?;
let target = if self.check(&Token::Eof) {
None
} else {
Some(self.expect_ident()?)
};
Ok(SqlCommand::Maintenance(MaintenanceCommand::Analyze {
target,
}))
}
Token::Copy => {
self.advance()?;
let table = self.expect_ident()?;
self.expect(Token::From)?;
let path = self.parse_string()?;
let mut delimiter: Option<char> = None;
let mut has_header = false;
let format = CopyFormat::Csv;
if self.consume(&Token::With)? || self.consume_ident_ci("WITH")? {
self.expect(Token::LParen)?;
loop {
if self.consume(&Token::Format)? || self.consume_ident_ci("FORMAT")? {
let _ = self.consume(&Token::Eq)?;
let _ = self.expect_ident()?;
} else if self.consume(&Token::Header)? {
let _ = self.consume(&Token::Eq)?;
has_header = match self.peek().clone() {
Token::True => {
self.advance()?;
true
}
Token::False => {
self.advance()?;
false
}
Token::Ident(ref n) if n.eq_ignore_ascii_case("true") => {
self.advance()?;
true
}
Token::Ident(ref n) if n.eq_ignore_ascii_case("false") => {
self.advance()?;
false
}
_ => true,
};
} else if self.consume(&Token::Delimiter)? {
let _ = self.consume(&Token::Eq)?;
let s = self.parse_string()?;
delimiter = s.chars().next();
} else {
break;
}
if !self.consume(&Token::Comma)? {
break;
}
}
self.expect(Token::RParen)?;
}
loop {
if self.consume(&Token::Delimiter)? {
let s = self.parse_string()?;
delimiter = s.chars().next();
} else if self.consume(&Token::Header)? {
has_header = true;
} else {
break;
}
}
Ok(SqlCommand::CopyFrom(CopyFromQuery {
table,
path,
format,
delimiter,
has_header,
}))
}
other => Err(ParseError::expected(
vec![
"SELECT",
"FROM",
"INSERT",
"UPDATE",
"DELETE",
"EXPLAIN",
"CREATE",
"DROP",
"ALTER",
"SET",
"SHOW",
"BEGIN",
"COMMIT",
"ROLLBACK",
"SAVEPOINT",
"RELEASE",
"START",
"VACUUM",
"ANALYZE",
"COPY",
"REFRESH",
"DESCRIBE",
"DESC",
],
other,
self.position(),
)),
}
}
}