use pgwire::api::results::{Response, Tag};
use pgwire::error::PgWireResult;
use nodedb_sql::ddl_ast::statement::{
AutomationStmt, CollectionStmt, NodedbStatement, PolicyStmt, StreamViewStmt,
};
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use crate::types::DatabaseId;
use super::exists::{
alert_exists, change_stream_exists, collection_exists, continuous_aggregate_exists,
materialized_view_exists, retention_policy_exists, schedule_exists, sequence_exists,
trigger_exists,
};
pub(super) fn try_dispatch_guards(
state: &SharedState,
identity: &AuthenticatedIdentity,
stmt: &NodedbStatement,
database_id: DatabaseId,
) -> Option<PgWireResult<Vec<Response>>> {
match stmt {
NodedbStatement::Collection(CollectionStmt::CreateCollection {
name,
if_not_exists: true,
..
}) => {
if collection_exists(state, identity, name, database_id) {
return Some(Ok(vec![Response::Execution(Tag::new("CREATE COLLECTION"))]));
}
None }
NodedbStatement::Collection(CollectionStmt::CreateTable {
name,
if_not_exists: true,
..
}) => {
if collection_exists(state, identity, name, database_id) {
return Some(Ok(vec![Response::Execution(Tag::new("CREATE TABLE"))]));
}
None }
NodedbStatement::Collection(CollectionStmt::CreateSequence {
name,
if_not_exists: true,
..
}) => {
if sequence_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new("CREATE SEQUENCE"))]));
}
None
}
NodedbStatement::Collection(CollectionStmt::DropIndex {
if_exists: true, ..
}) => None,
NodedbStatement::Automation(AutomationStmt::DropTrigger {
name,
if_exists: true,
..
}) => {
if !trigger_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new("DROP TRIGGER"))]));
}
None
}
NodedbStatement::Automation(AutomationStmt::DropSchedule {
name,
if_exists: true,
}) => {
if !schedule_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new("DROP SCHEDULE"))]));
}
None
}
NodedbStatement::Collection(CollectionStmt::DropSequence {
name,
if_exists: true,
}) => {
if !sequence_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new("DROP SEQUENCE"))]));
}
None
}
NodedbStatement::Automation(AutomationStmt::DropAlert {
name,
if_exists: true,
}) => {
if !alert_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new("DROP ALERT"))]));
}
None
}
NodedbStatement::Policy(PolicyStmt::DropRetentionPolicy {
name,
if_exists: true,
}) => {
if !retention_policy_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new(
"DROP RETENTION POLICY",
))]));
}
None
}
NodedbStatement::StreamView(StreamViewStmt::DropChangeStream {
name,
if_exists: true,
}) => {
if !change_stream_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new(
"DROP CHANGE STREAM",
))]));
}
None
}
NodedbStatement::StreamView(StreamViewStmt::DropMaterializedView {
name,
if_exists: true,
}) => {
if !materialized_view_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new(
"DROP MATERIALIZED VIEW",
))]));
}
None
}
NodedbStatement::StreamView(StreamViewStmt::DropContinuousAggregate {
name,
if_exists: true,
}) => {
if !continuous_aggregate_exists(state, identity, name) {
return Some(Ok(vec![Response::Execution(Tag::new(
"DROP CONTINUOUS AGGREGATE",
))]));
}
None
}
NodedbStatement::Policy(PolicyStmt::DropRlsPolicy {
name,
collection,
if_exists: true,
}) => {
let tid = identity.tenant_id.as_u64();
if !state.rls.policy_exists(tid, collection, name) {
return Some(Ok(vec![Response::Execution(Tag::new("DROP RLS POLICY"))]));
}
None
}
NodedbStatement::StreamView(StreamViewStmt::DropConsumerGroup {
name,
stream,
if_exists: true,
}) => {
let tid = identity.tenant_id.as_u64();
if state.group_registry.get(tid, stream, name).is_none() {
return Some(Ok(vec![Response::Execution(Tag::new(
"DROP CONSUMER GROUP",
))]));
}
None
}
_ => None,
}
}