use pgwire::api::results::Response;
use pgwire::error::PgWireResult;
use crate::bridge::physical_plan::DocumentOp;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
pub async fn dispatch(
state: &SharedState,
identity: &AuthenticatedIdentity,
sql: &str,
) -> Option<PgWireResult<Vec<Response>>> {
let upper = sql.to_uppercase();
let parts: Vec<&str> = sql.split_whitespace().collect();
if upper.starts_with("CREATE USER ") {
return Some(super::user::create_user(state, identity, &parts));
}
if upper.starts_with("ALTER USER ") {
return Some(super::user::alter_user(state, identity, &parts));
}
if upper.starts_with("DROP USER ") {
return Some(super::user::drop_user(state, identity, &parts));
}
if upper.starts_with("CREATE SERVICE ACCOUNT ") {
return Some(super::service_account::create_service_account(
state, identity, &parts,
));
}
if upper.starts_with("DROP SERVICE ACCOUNT ") {
return Some(super::service_account::drop_service_account(
state, identity, &parts,
));
}
if upper.starts_with("CREATE TENANT ") {
return Some(super::tenant::create_tenant(state, identity, &parts));
}
if upper.starts_with("ALTER TENANT ") {
return Some(super::tenant::alter_tenant(state, identity, &parts));
}
if upper.starts_with("DROP TENANT ") {
return Some(super::tenant::drop_tenant(state, identity, &parts));
}
if upper.starts_with("GRANT ") {
return Some(super::grant::handle_grant(state, identity, &parts));
}
if upper.starts_with("REVOKE ") {
return Some(super::grant::handle_revoke(state, identity, &parts));
}
if upper.starts_with("CREATE ROLE ") {
return Some(super::role::create_role(state, identity, &parts));
}
if upper.starts_with("ALTER ROLE ") {
return Some(super::role::alter_role(state, identity, &parts));
}
if upper.starts_with("DROP ROLE ") {
return Some(super::role::drop_role(state, identity, &parts));
}
if upper.starts_with("BACKUP TENANT ") {
return Some(super::backup::backup_tenant(state, identity, &parts));
}
if upper.starts_with("RESTORE TENANT ") {
if upper.ends_with(" DRY RUN") || upper.ends_with(" DRYRUN") {
return Some(super::backup::restore_tenant_dry_run(
state, identity, &parts,
));
}
return Some(super::backup::restore_tenant(state, identity, &parts));
}
if upper.starts_with("DESCRIBE ") || upper.starts_with("\\D ") {
return Some(super::collection::describe_collection(
state, identity, &parts,
));
}
if upper.starts_with("CREATE COLLECTION ") {
return Some(super::collection::create_collection(
state, identity, &parts, sql,
));
}
if upper.starts_with("DROP COLLECTION ") {
return Some(super::collection::drop_collection(state, identity, &parts));
}
if upper == "SHOW COLLECTIONS" || upper.starts_with("SHOW COLLECTIONS") {
return Some(super::collection::show_collections(state, identity));
}
if upper.starts_with("CREATE TIMESERIES ") {
return Some(super::timeseries::create_timeseries(
state, identity, &parts,
));
}
if upper.starts_with("SHOW PARTITIONS ") {
return Some(super::timeseries::show_partitions(state, identity, &parts));
}
if upper.starts_with("ALTER TIMESERIES ") {
return Some(super::timeseries::alter_timeseries(state, identity, &parts));
}
if upper.starts_with("REWRITE PARTITIONS ") {
return Some(super::timeseries::rewrite_partitions(
state, identity, &parts,
));
}
if upper.starts_with("CREATE TOPIC ") {
return Some(super::pubsub::create_topic(state, identity, &parts));
}
if upper.starts_with("DROP TOPIC ") {
return Some(super::pubsub::drop_topic(state, identity, &parts));
}
if upper == "SHOW TOPICS" || upper.starts_with("SHOW TOPICS") {
return Some(super::pubsub::show_topics(state));
}
if upper.starts_with("PUBLISH TO ") {
return Some(super::pubsub::publish_to(state, identity, sql, &parts));
}
if upper.starts_with("SUBSCRIBE TO ") {
return Some(super::pubsub::subscribe_to(state, identity, sql, &parts));
}
if upper.starts_with("CREATE INDEX ") || upper.starts_with("CREATE UNIQUE INDEX ") {
return Some(super::collection::create_index(
state, identity, &parts, sql,
));
}
if upper.starts_with("DROP INDEX ") {
return Some(super::collection::drop_index(state, identity, &parts));
}
if upper.starts_with("SHOW INDEXES") || upper.starts_with("SHOW INDEX") {
return Some(super::collection::show_indexes(state, identity, &parts));
}
if upper.starts_with("ALTER COLLECTION ") && upper.contains("OWNER TO") {
return Some(super::ownership::alter_collection_owner(
state, identity, &parts,
));
}
if upper.starts_with("ALTER TABLE ") && (upper.contains("ADD COLUMN") || upper.contains("ADD "))
{
return Some(super::collection::alter_table_add_column(
state, identity, &parts, sql,
));
}
if upper.starts_with("CREATE RLS POLICY ") {
return Some(super::rls::create_rls_policy(state, identity, &parts));
}
if upper.starts_with("DROP RLS POLICY ") {
return Some(super::rls::drop_rls_policy(state, identity, &parts));
}
if upper.starts_with("SHOW RLS POLICIES") || upper.starts_with("SHOW RLS POLICY") {
return Some(super::rls::show_rls_policies(state, identity, &parts));
}
if upper.starts_with("BLACKLIST ") {
return Some(super::blacklist_ddl::handle_blacklist(
state, identity, &parts,
));
}
if upper.starts_with("SHOW BLACKLIST") {
return Some(super::blacklist_ddl::show_blacklist(
state, identity, &parts,
));
}
if upper.starts_with("DEACTIVATE AUTH USER ") || upper.starts_with("ALTER AUTH USER ") {
return Some(super::auth_user_ddl::handle_auth_user(
state, identity, &parts,
));
}
if upper.starts_with("PURGE AUTH USERS ") {
return Some(super::auth_user_ddl::purge_auth_users(
state, identity, &parts,
));
}
if upper.starts_with("SHOW AUTH USERS") {
return Some(super::auth_user_ddl::show_auth_users(
state, identity, &parts,
));
}
if upper.starts_with("CREATE ORG ")
|| upper.starts_with("ALTER ORG ")
|| upper.starts_with("DROP ORG ")
{
return Some(super::org_ddl::handle_org(state, identity, &parts));
}
if upper.starts_with("SHOW ORGS") {
return Some(super::org_ddl::show_orgs(state, identity, &parts));
}
if upper.starts_with("SHOW MEMBERS OF ORG") {
return Some(super::org_ddl::show_members(state, identity, &parts));
}
if upper.starts_with("DEFINE SCOPE ") {
return Some(super::scope_ddl::define_scope(state, identity, &parts));
}
if upper.starts_with("DROP SCOPE ") {
return Some(super::scope_ddl::drop_scope(state, identity, &parts));
}
if upper.starts_with("GRANT SCOPE ") {
return Some(super::scope_ddl::grant_scope(state, identity, &parts));
}
if upper.starts_with("REVOKE SCOPE ") {
return Some(super::scope_ddl::revoke_scope(state, identity, &parts));
}
if upper.starts_with("ALTER SCOPE ") {
return Some(super::scope_query_ddl::alter_scope(state, identity, &parts));
}
if upper.starts_with("SHOW MY SCOPES") {
return Some(super::scope_query_ddl::show_my_scopes(
state, identity, &parts,
));
}
if upper.starts_with("SHOW SCOPES FOR ") {
return Some(super::scope_query_ddl::show_scopes_for(
state, identity, &parts,
));
}
if upper.starts_with("RENEW SCOPE ") {
return Some(super::scope_ddl::renew_scope(state, identity, &parts));
}
if upper.starts_with("EXPLAIN PERMISSION ") {
return Some(super::explain_ddl::explain_permission(
state, identity, &parts,
));
}
if upper.starts_with("EXPLAIN SCOPE ") {
return Some(super::explain_ddl::explain_scope(state, identity, &parts));
}
if upper.starts_with("EMERGENCY LOCKDOWN") {
return Some(super::emergency_ddl::emergency_lockdown(
state, identity, &parts,
));
}
if upper.starts_with("EMERGENCY UNLOCK") {
return Some(super::emergency_ddl::emergency_unlock(
state, identity, &parts,
));
}
if upper.starts_with("BLACKLIST AUTH USERS WHERE") {
return Some(super::emergency_ddl::bulk_blacklist(
state, identity, &parts,
));
}
if upper.starts_with("CREATE AUTH KEY ") {
return Some(super::auth_key_ddl::create_auth_key(
state, identity, &parts,
));
}
if upper.starts_with("ROTATE AUTH KEY ") {
return Some(super::auth_key_ddl::rotate_auth_key(
state, identity, &parts,
));
}
if upper.starts_with("LIST AUTH KEYS") {
return Some(super::auth_key_ddl::list_auth_keys(state, identity, &parts));
}
if upper.starts_with("IMPERSONATE AUTH USER ") {
return Some(super::impersonation_ddl::impersonate(
state, identity, &parts,
));
}
if upper.starts_with("STOP IMPERSONATION") {
return Some(super::impersonation_ddl::stop_impersonation(
state, identity, &parts,
));
}
if upper.starts_with("DELEGATE AUTH USER ") {
return Some(super::impersonation_ddl::delegate(state, identity, &parts));
}
if upper.starts_with("REVOKE DELEGATION ") {
return Some(super::impersonation_ddl::revoke_delegation(
state, identity, &parts,
));
}
if upper.starts_with("SHOW DELEGATIONS") {
return Some(super::impersonation_ddl::show_delegations(
state, identity, &parts,
));
}
if upper.starts_with("SHOW SESSIONS") {
return Some(super::session_ddl::show_sessions(state, identity, &parts));
}
if upper.starts_with("KILL SESSION ") {
return Some(super::session_ddl::kill_session(state, identity, &parts));
}
if upper.starts_with("KILL USER SESSIONS ") {
return Some(super::session_ddl::kill_user_sessions(
state, identity, &parts,
));
}
if upper.starts_with("VERIFY AUDIT CHAIN") {
return Some(super::session_ddl::verify_audit_chain(
state, identity, &parts,
));
}
if upper.starts_with("DEFINE METERING DIMENSION ") {
return Some(super::metering_ddl::define_dimension(
state, identity, &parts,
));
}
if upper.starts_with("SHOW USAGE ") {
return Some(super::metering_ddl::show_usage(state, identity, &parts));
}
if upper.starts_with("SHOW QUOTA ") {
return Some(super::metering_ddl::show_quota(state, identity, &parts));
}
if upper.starts_with("SHOW SCOPE GRANTS") {
return Some(super::scope_ddl::show_scope_grants(state, identity, &parts));
}
if upper.starts_with("SHOW SCOPE") {
return Some(super::scope_ddl::show_scopes(state, identity, &parts));
}
if upper.starts_with("CREATE API KEY ") {
return Some(super::apikey::create_api_key(state, identity, &parts));
}
if upper.starts_with("REVOKE API KEY ") {
return Some(super::apikey::revoke_api_key(state, identity, &parts));
}
if upper.starts_with("LIST API KEYS") {
return Some(super::apikey::list_api_keys(state, identity, &parts));
}
if upper.starts_with("SHOW CLUSTER") {
return Some(super::cluster::show_cluster(state, identity));
}
if upper.starts_with("SHOW RAFT GROUPS") {
return Some(super::cluster::show_raft_groups(state, identity));
}
if upper.starts_with("SHOW RAFT GROUP ") {
return Some(super::cluster::show_raft_group(state, identity, &parts));
}
if upper.starts_with("ALTER RAFT GROUP ") {
return Some(super::cluster::alter_raft_group(state, identity, &parts));
}
if upper.starts_with("SHOW MIGRATIONS") {
return Some(super::cluster::show_migrations(state, identity));
}
if upper.starts_with("REBALANCE") {
return Some(super::cluster::rebalance(state, identity));
}
if upper.starts_with("SHOW PEER HEALTH") {
return Some(super::cluster::show_peer_health(state, identity));
}
if upper.starts_with("SHOW NODES") {
return Some(super::cluster::show_nodes(state, identity));
}
if upper.starts_with("SHOW NODE ") {
return Some(super::cluster::show_node(state, identity, &parts));
}
if upper.starts_with("REMOVE NODE ") {
return Some(super::cluster::remove_node(state, identity, &parts));
}
if upper.starts_with("SHOW USERS") {
return Some(super::inspect::show_users(state, identity));
}
if upper.starts_with("SHOW TENANTS") {
return Some(super::inspect::show_tenants(state, identity));
}
if upper.starts_with("SHOW SESSION") {
return Some(super::inspect::show_session(identity));
}
if upper.starts_with("TRUNCATE AUDIT")
|| upper.starts_with("DELETE AUDIT")
|| upper.starts_with("CLEAR AUDIT")
{
return Some(Err(super::super::types::sqlstate_error(
"42501",
"audit log cannot be manually truncated. Entries are pruned automatically by the retention policy (audit_retention_days in config).",
)));
}
if upper.starts_with("EXPORT AUDIT") {
return Some(super::inspect::export_audit_log(state, identity, &parts));
}
if upper.starts_with("SHOW AUDIT LOG") || upper.starts_with("SHOW AUDIT_LOG") {
return Some(super::inspect::show_audit_log(state, identity, &parts));
}
if upper.starts_with("SHOW PERMISSIONS") {
return Some(super::inspect::show_permissions(state, identity, &parts));
}
if upper.starts_with("SHOW GRANTS") {
return Some(super::inspect::show_grants(state, identity, &parts));
}
if upper.starts_with("SEARCH ") && upper.contains("USING VECTOR") {
return Some(super::dsl::search_vector(state, identity, sql).await);
}
if upper.starts_with("SEARCH ") && upper.contains("USING FUSION") {
return Some(super::dsl::search_fusion(state, identity, sql).await);
}
if upper.starts_with("CREATE VECTOR INDEX ") {
return Some(super::dsl::create_vector_index(state, identity, &parts));
}
if upper.starts_with("CREATE FULLTEXT INDEX ") {
return Some(super::dsl::create_fulltext_index(state, identity, &parts));
}
if upper.starts_with("CREATE SPATIAL INDEX ") {
return Some(super::spatial::create_spatial_index(
state, identity, &parts,
));
}
if upper.starts_with("CRDT MERGE ") {
return Some(super::dsl::crdt_merge(state, identity, &parts).await);
}
if upper.starts_with("SELECT CRDT_STATE(") || upper.starts_with("SELECT CRDT_STATE (") {
return Some(super::crdt_ops::crdt_state(state, identity, sql).await);
}
if upper.starts_with("SELECT CRDT_APPLY(") || upper.starts_with("SELECT CRDT_APPLY (") {
return Some(super::crdt_ops::crdt_apply(state, identity, sql).await);
}
if upper.starts_with("GRAPH INSERT EDGE ") {
return Some(super::graph_ops::insert_edge(state, identity, &parts, sql).await);
}
if upper.starts_with("GRAPH DELETE EDGE ") {
return Some(super::graph_ops::delete_edge(state, identity, &parts, sql).await);
}
if upper.starts_with("GRAPH TRAVERSE ") {
return Some(super::graph_ops::traverse(state, identity, &parts, sql).await);
}
if upper.starts_with("GRAPH NEIGHBORS ") {
return Some(super::graph_ops::neighbors(state, identity, &parts, sql).await);
}
if upper.starts_with("GRAPH PATH ") {
return Some(super::graph_ops::shortest_path(state, identity, &parts, sql).await);
}
if upper.starts_with("GRAPH ALGO ") {
return Some(super::graph_ops::algo(state, identity, &parts, sql).await);
}
if upper.starts_with("MATCH ") || upper.starts_with("OPTIONAL MATCH ") {
return Some(super::match_ops::match_query(state, identity, sql).await);
}
if upper.starts_with("COPY ") && upper.contains(" FROM ") {
return Some(super::bulk::copy_from(state, identity, &parts).await);
}
if upper.starts_with("INSERT INTO ")
&& upper.contains("VALUES")
&& let Some(result) = super::collection_insert::insert_document(state, identity, sql).await
{
return Some(result);
}
if upper.starts_with("UPSERT INTO ")
&& upper.contains("VALUES")
&& let Some(result) = super::collection_insert::upsert_document(state, identity, sql).await
{
return Some(result);
}
if upper.starts_with("SHOW CHANGES ") {
if let Some(coll_name) = super::sql_parse::extract_collection_after(sql, " FOR ") {
let since_ms: u64 = if let Some(since_pos) = upper.find(" SINCE ") {
let since_str = sql[since_pos + 7..]
.split_whitespace()
.next()
.unwrap_or("0");
match super::sql_parse::parse_since_timestamp(since_str) {
Ok(ms) => ms,
Err(msg) => {
return Some(Err(super::super::types::sqlstate_error(
"22007",
&msg.to_string(),
)));
}
}
} else {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
now_ms.saturating_sub(86_400 * 1000)
};
let limit = upper
.find(" LIMIT ")
.and_then(|pos| sql[pos + 7..].split_whitespace().next())
.and_then(|s| s.parse::<usize>().ok())
.unwrap_or(1000);
let changes = state
.change_stream
.query_changes(Some(&coll_name), since_ms, limit);
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response};
let schema = std::sync::Arc::new(vec![
super::super::types::text_field("collection"),
super::super::types::text_field("operation"),
super::super::types::text_field("document_id"),
super::super::types::text_field("timestamp_ms"),
super::super::types::text_field("lsn"),
]);
let mut rows = Vec::with_capacity(changes.len());
for change in &changes {
let mut encoder = DataRowEncoder::new(schema.clone());
let _ = encoder.encode_field(&change.collection);
let _ = encoder.encode_field(&change.operation.as_str().to_string());
let _ = encoder.encode_field(&change.document_id);
let _ = encoder.encode_field(&change.timestamp_ms.to_string());
let _ = encoder.encode_field(&change.lsn.as_u64().to_string());
rows.push(Ok(encoder.take_row()));
}
return Some(Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))]));
}
return Some(Err(super::super::types::sqlstate_error(
"42601",
"syntax: SHOW CHANGES FOR <collection> [SINCE <timestamp>]",
)));
}
if upper.starts_with("LIVE SELECT ") {
if let Some(coll_name) = super::sql_parse::extract_collection_after(sql, " FROM ") {
let tenant_id = identity.tenant_id;
let sub = state
.change_stream
.subscribe(Some(coll_name.clone()), Some(tenant_id));
let sub_id = sub.id;
let channel = format!("live_{coll_name}");
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response};
let schema = std::sync::Arc::new(vec![
super::super::types::text_field("subscription_id"),
super::super::types::text_field("channel"),
super::super::types::text_field("collection"),
super::super::types::text_field("status"),
]);
let mut encoder = DataRowEncoder::new(schema.clone());
let _ = encoder.encode_field(&sub_id.to_string());
let _ = encoder.encode_field(&channel);
let _ = encoder.encode_field(&coll_name);
let _ = encoder.encode_field(&"active");
let row = encoder.take_row();
return Some(Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(vec![Ok(row)]),
))]));
}
return Some(Err(super::super::types::sqlstate_error(
"42601",
"syntax: LIVE SELECT [*|fields] FROM <collection> [WHERE ...]",
)));
}
if upper.starts_with("DEFINE FIELD ") {
return Some(super::field_def::define_field(state, identity, sql));
}
if upper.starts_with("DEFINE EVENT ") {
return Some(super::field_def::define_event(state, identity, sql));
}
if upper.starts_with("SELECT ESTIMATE_COUNT(") || upper.starts_with("SELECT ESTIMATE_COUNT (") {
let inner = sql
.find('(')
.and_then(|start| sql.rfind(')').map(|end| &sql[start + 1..end]));
if let Some(args_str) = inner {
let args: Vec<&str> = args_str
.split(',')
.map(|s| s.trim().trim_matches('\''))
.collect();
if args.len() >= 2 {
let coll = args[0].to_lowercase();
let field = args[1].to_string();
let tenant_id = identity.tenant_id;
let vshard = crate::types::VShardId::from_collection(&coll);
let plan =
crate::bridge::envelope::PhysicalPlan::Document(DocumentOp::EstimateCount {
collection: coll,
field,
});
match crate::control::server::dispatch_utils::dispatch_to_data_plane(
state, tenant_id, vshard, plan, 0,
)
.await
{
Ok(resp) => {
let payload_text =
crate::data::executor::response_codec::decode_payload_to_json(
&resp.payload,
);
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response};
let schema = std::sync::Arc::new(vec![super::super::types::text_field(
"estimate_count",
)]);
let mut encoder = DataRowEncoder::new(schema.clone());
let _ = encoder.encode_field(&payload_text);
let row = encoder.take_row();
return Some(Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(vec![Ok(row)]),
))]));
}
Err(e) => {
return Some(Err(super::super::types::sqlstate_error(
"XX000",
&e.to_string(),
)));
}
}
}
}
return Some(Err(super::super::types::sqlstate_error(
"42601",
"usage: SELECT ESTIMATE_COUNT('collection', 'field')",
)));
}
if upper.starts_with("TRUNCATE ") {
let coll_name = parts.get(1).map(|s| s.to_lowercase()).unwrap_or_default();
if coll_name.is_empty() {
return Some(Err(super::super::types::sqlstate_error(
"42601",
"TRUNCATE requires a collection name",
)));
}
let tenant_id = identity.tenant_id;
let vshard = crate::types::VShardId::from_collection(&coll_name);
let plan = crate::bridge::envelope::PhysicalPlan::Document(DocumentOp::Truncate {
collection: coll_name,
});
if let Err(e) = crate::control::server::dispatch_utils::dispatch_to_data_plane(
state, tenant_id, vshard, plan, 0,
)
.await
{
return Some(Err(super::super::types::sqlstate_error(
"XX000",
&e.to_string(),
)));
}
return Some(Ok(vec![pgwire::api::results::Response::Execution(
pgwire::api::results::Tag::new("TRUNCATE"),
)]));
}
None
}