use pgwire::api::results::Response;
use pgwire::error::PgWireResult;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
pub(super) async fn dispatch(
state: &SharedState,
identity: &AuthenticatedIdentity,
sql: &str,
upper: &str,
parts: &[&str],
) -> Option<PgWireResult<Vec<Response>>> {
if upper.starts_with("DROP CHANGE STREAM ") {
return Some(super::super::change_stream::drop_change_stream(
state, identity, parts,
));
}
if upper.starts_with("SHOW CHANGE STREAM") {
return Some(super::super::change_stream::show_change_streams(
state, identity,
));
}
if upper.starts_with("DROP CONSUMER GROUP ") {
return Some(super::super::consumer_group::drop_consumer_group(
state, identity, parts,
));
}
if upper.starts_with("SHOW CONSUMER GROUPS ") {
return Some(super::super::consumer_group::show_consumer_groups(
state, identity, parts,
));
}
if upper.starts_with("SHOW PARTITIONS ") {
return Some(super::super::consumer_group::show_partitions(
state, identity, parts,
));
}
if upper.starts_with("COMMIT OFFSET ") || upper.starts_with("COMMIT OFFSETS ") {
return Some(super::super::consumer_group::commit_offset(
state, identity, parts,
));
}
if upper.starts_with("SELECT ")
&& upper.contains("FROM STREAM ")
&& upper.contains("CONSUMER GROUP")
{
return Some(super::super::stream_select::select_from_stream(state, identity, parts).await);
}
if upper.starts_with("CREATE TOPIC ") {
return Some(super::super::topic::create_topic(
state, identity, parts, sql,
));
}
if upper.starts_with("DROP TOPIC ") {
return Some(super::super::topic::drop_topic(state, identity, parts));
}
if upper.starts_with("SHOW TOPIC") {
return Some(super::super::topic::show_topics(state, identity));
}
if upper.starts_with("PUBLISH TO ") {
return Some(super::super::topic::handle_publish(state, identity, sql).await);
}
if upper.starts_with("SELECT ")
&& upper.contains("FROM TOPIC ")
&& upper.contains("CONSUMER GROUP")
{
return Some(super::helpers::select_from_topic(state, identity, parts).await);
}
None
}