use std::sync::Arc;
use futures::stream;
use pgwire::api::results::{DataRowEncoder, QueryResponse, Response, Tag};
use pgwire::error::PgWireResult;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::state::SharedState;
use super::super::types::{sqlstate_error, text_field};
pub fn handle_org(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if parts.is_empty() {
return Err(sqlstate_error("42601", "empty org command"));
}
let cmd = parts[0].to_uppercase();
match cmd.as_str() {
"CREATE" => create_org(state, identity, parts),
"ALTER" => alter_org(state, identity, parts),
"DROP" => drop_org(state, identity, parts),
_ => Err(sqlstate_error(
"42601",
"expected CREATE ORG, ALTER ORG, or DROP ORG",
)),
}
}
fn create_org(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: requires superuser",
));
}
if parts.len() < 3 {
return Err(sqlstate_error(
"42601",
"syntax: CREATE ORG '<name>' [IN TENANT <id>]",
));
}
let org_id = parts[2].trim_matches('\'');
let tenant_id = parts
.iter()
.position(|p| p.to_uppercase() == "TENANT")
.and_then(|i| parts.get(i + 1))
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(identity.tenant_id.as_u32());
state
.orgs
.create_org(org_id, org_id, tenant_id)
.map_err(|e| sqlstate_error("23505", &e.to_string()))?;
state.audit_record(
crate::control::security::audit::AuditEvent::AdminAction,
Some(identity.tenant_id),
&identity.username,
&format!("created org '{org_id}' in tenant {tenant_id}"),
);
Ok(vec![Response::Execution(Tag::new("CREATE ORG"))])
}
fn alter_org(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: requires superuser",
));
}
if parts.len() < 6 {
return Err(sqlstate_error(
"42601",
"syntax: ALTER ORG '<id>' SET STATUS <status>",
));
}
let org_id = parts[2].trim_matches('\'');
let status = parts[5].to_lowercase();
let found = state
.orgs
.set_status(org_id, &status)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
if !found {
return Err(sqlstate_error(
"42704",
&format!("org '{org_id}' not found"),
));
}
state.audit_record(
crate::control::security::audit::AuditEvent::AdminAction,
Some(identity.tenant_id),
&identity.username,
&format!("org '{org_id}' status set to {status}"),
);
Ok(vec![Response::Execution(Tag::new("ALTER ORG"))])
}
fn drop_org(
state: &SharedState,
identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
if !identity.is_superuser {
return Err(sqlstate_error(
"42501",
"permission denied: requires superuser",
));
}
if parts.len() < 3 {
return Err(sqlstate_error("42601", "syntax: DROP ORG '<org_id>'"));
}
let org_id = parts[2].trim_matches('\'');
let found = state
.orgs
.drop_org(org_id)
.map_err(|e| sqlstate_error("XX000", &e.to_string()))?;
if !found {
return Err(sqlstate_error(
"42704",
&format!("org '{org_id}' not found"),
));
}
state.audit_record(
crate::control::security::audit::AuditEvent::AdminAction,
Some(identity.tenant_id),
&identity.username,
&format!("dropped org '{org_id}'"),
);
Ok(vec![Response::Execution(Tag::new("DROP ORG"))])
}
pub fn show_orgs(
state: &SharedState,
_identity: &AuthenticatedIdentity,
parts: &[&str],
) -> PgWireResult<Vec<Response>> {
let tenant_filter = parts
.iter()
.position(|p| p.to_uppercase() == "TENANT")
.and_then(|i| parts.get(i + 1))
.and_then(|s| s.parse::<u32>().ok());
let orgs = state.orgs.list(tenant_filter);
let schema = Arc::new(vec![
text_field("org_id"),
text_field("name"),
text_field("tenant_id"),
text_field("status"),
]);
let rows: Vec<_> = orgs
.iter()
.map(|o| {
let mut enc = DataRowEncoder::new(schema.clone());
let _ = enc.encode_field(&o.org_id);
let _ = enc.encode_field(&o.name);
let _ = enc.encode_field(&o.tenant_id.to_string());
let _ = enc.encode_field(&o.status);
Ok(enc.take_row())
})
.collect();
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}
pub fn show_members(
state: &SharedState,
_identity: &AuthenticatedIdentity, parts: &[&str],
) -> PgWireResult<Vec<Response>> {
let org_id = parts
.iter()
.position(|p| p.to_uppercase() == "ORG")
.and_then(|i| parts.get(i + 1))
.map(|s| s.trim_matches('\''))
.ok_or_else(|| sqlstate_error("42601", "syntax: SHOW MEMBERS OF ORG '<org_id>'"))?;
let members = state.orgs.members_of(org_id);
let schema = Arc::new(vec![
text_field("user_id"),
text_field("org_id"),
text_field("role"),
text_field("joined_at"),
]);
let rows: Vec<_> = members
.iter()
.map(|m| {
let mut enc = DataRowEncoder::new(schema.clone());
let _ = enc.encode_field(&m.auth_user_id);
let _ = enc.encode_field(&m.org_id);
let _ = enc.encode_field(&m.role);
let _ = enc.encode_field(&m.joined_at.to_string());
Ok(enc.take_row())
})
.collect();
Ok(vec![Response::Query(QueryResponse::new(
schema,
stream::iter(rows),
))])
}