use axum::extract::State;
use axum::http::HeaderMap;
use axum::response::IntoResponse;
use crate::bridge::envelope::{PhysicalPlan, Status};
use crate::control::security::identity::{required_permission, role_grants_permission};
use crate::types::VShardId;
use super::super::auth::{ApiError, AppState, resolve_identity};
pub async fn query(
headers: HeaderMap,
State(state): State<AppState>,
axum::Json(body): axum::Json<serde_json::Value>,
) -> Result<impl IntoResponse, ApiError> {
let identity = resolve_identity(&headers, &state, "http")?;
let trace_id = crate::control::trace_context::extract_from_headers(&headers);
let sql = body["sql"]
.as_str()
.ok_or_else(|| ApiError::BadRequest("missing 'sql' field".into()))?;
if let Some(result) =
crate::control::server::pgwire::ddl::dispatch(&state.shared, &identity, sql.trim()).await
{
return match result {
Ok(responses) => {
let json_rows = responses_to_json(responses);
Ok(axum::Json(serde_json::json!({
"status": "ok",
"rows": json_rows,
})))
}
Err(e) => Err(ApiError::BadRequest(e.to_string())),
};
}
let tenant_id = identity.tenant_id;
let mut auth_ctx = crate::control::server::session_auth::build_auth_context(&identity);
let clean_sql =
crate::control::server::session_auth::extract_and_apply_on_deny(sql, &mut auth_ctx);
let tasks = state
.query_ctx
.plan_sql_with_rls(&clean_sql, tenant_id, &auth_ctx, &state.shared.rls)
.await
.map_err(|e| ApiError::BadRequest(format!("SQL planning failed: {e}")))?;
if tasks.is_empty() {
return Ok(axum::Json(serde_json::json!({
"status": "ok",
"rows": [],
})));
}
let mut result_rows = Vec::new();
for task in tasks {
let required = required_permission(&task.plan);
if !identity.is_superuser
&& !identity
.roles
.iter()
.any(|r| role_grants_permission(r, required))
{
return Err(ApiError::Forbidden(format!(
"insufficient permissions for this operation (requires {required:?})"
)));
}
if task.tenant_id != tenant_id {
return Err(ApiError::Forbidden("tenant isolation violation".into()));
}
wal_append_if_write(&state, &task)?;
let response =
dispatch_to_data_plane(&state, task.tenant_id, task.vshard_id, task.plan, trace_id)
.await
.map_err(|e| ApiError::Internal(format!("dispatch failed: {e}")))?;
if response.status != Status::Ok {
let detail = response
.error_code
.as_ref()
.map(|c| format!("{c:?}"))
.unwrap_or_else(|| "unknown error".into());
return Err(ApiError::Internal(detail));
}
let payload = response.payload.as_ref();
if !payload.is_empty() {
match decode_payload_to_json(payload) {
Ok(value) => result_rows.push(value),
Err(_) => {
use base64::Engine;
let encoded = base64::engine::general_purpose::STANDARD.encode(payload);
result_rows.push(serde_json::json!({ "data": encoded }));
}
}
}
}
Ok(axum::Json(serde_json::json!({
"status": "ok",
"rows": result_rows,
})))
}
fn wal_append_if_write(
state: &AppState,
task: &crate::control::planner::physical::PhysicalTask,
) -> Result<(), ApiError> {
crate::control::server::dispatch_utils::wal_append_if_write(
&state.shared.wal,
task.tenant_id,
task.vshard_id,
&task.plan,
)
.map_err(|e| ApiError::Internal(format!("WAL append: {e}")))
}
async fn dispatch_to_data_plane(
state: &AppState,
tenant_id: crate::types::TenantId,
vshard_id: VShardId,
plan: PhysicalPlan,
trace_id: u64,
) -> crate::Result<crate::bridge::envelope::Response> {
crate::control::server::dispatch_utils::dispatch_to_data_plane(
&state.shared,
tenant_id,
vshard_id,
plan,
trace_id,
)
.await
}
fn decode_payload_to_json(payload: &[u8]) -> Result<serde_json::Value, ()> {
if let Ok(val) = rmp_serde::from_slice::<serde_json::Value>(payload) {
return Ok(val);
}
if let Ok(val) = serde_json::from_slice::<serde_json::Value>(payload) {
return Ok(val);
}
Err(())
}
fn responses_to_json(responses: Vec<pgwire::api::results::Response>) -> Vec<serde_json::Value> {
use pgwire::api::results::Response;
let mut rows = Vec::new();
for resp in responses {
match resp {
Response::Execution(tag) => {
rows.push(serde_json::json!({
"type": "execution",
"tag": format!("{:?}", tag),
}));
}
Response::Query(_) => {
rows.push(serde_json::json!({
"type": "query",
"note": "query results available via pgwire protocol",
}));
}
Response::EmptyQuery => {
rows.push(serde_json::json!({ "type": "empty" }));
}
_ => {}
}
}
rows
}
pub async fn query_ndjson(
State(state): State<AppState>,
headers: HeaderMap,
body: String,
) -> impl IntoResponse {
use axum::http::StatusCode;
use axum::response::Response;
let identity = match resolve_identity(&headers, &state, "http") {
Ok(id) => id,
Err(e) => return e.into_response(),
};
let sql = body.trim().trim_matches('"');
if sql.is_empty() {
return (StatusCode::BAD_REQUEST, "empty SQL").into_response();
}
let tenant_id = identity.tenant_id;
let query_ctx = &state.query_ctx;
let auth_ctx = crate::control::server::session_auth::build_auth_context(&identity);
let tasks = match query_ctx
.plan_sql_with_rls(sql, tenant_id, &auth_ctx, &state.shared.rls)
.await
{
Ok(t) => t,
Err(e) => return (StatusCode::BAD_REQUEST, e.to_string()).into_response(),
};
let mut ndjson = String::new();
for task in tasks {
match crate::control::server::dispatch_utils::dispatch_to_data_plane(
&state.shared,
task.tenant_id,
task.vshard_id,
task.plan,
0,
)
.await
{
Ok(resp) if !resp.payload.is_empty() => {
let json_str =
crate::data::executor::response_codec::decode_payload_to_json(&resp.payload);
if let Ok(serde_json::Value::Array(items)) =
serde_json::from_str::<serde_json::Value>(&json_str)
{
for item in &items {
ndjson.push_str(&item.to_string());
ndjson.push('\n');
}
} else {
ndjson.push_str(&json_str);
ndjson.push('\n');
}
}
Ok(_) => {}
Err(e) => {
ndjson.push_str(&serde_json::json!({"error": e.to_string()}).to_string());
ndjson.push('\n');
}
}
}
Response::builder()
.header("Content-Type", "application/x-ndjson")
.body(axum::body::Body::from(ndjson))
.unwrap_or_else(|_| (StatusCode::INTERNAL_SERVER_ERROR, "encoding error").into_response())
}