use axum::{
extract::{Json, Path, State},
http::{HeaderMap, StatusCode, header},
response::{IntoResponse, Html, Response},
Extension,
};
use chrono::Utc;
use serde_json::{json, Value};
use std::time::Instant;
use uuid::Uuid;
use crate::{
database::{execute_query_with_pool, get_or_create_pool, validate_query_permissions},
models::*,
};
pub async fn health_check() -> impl IntoResponse {
Json(json!({
"status": "healthy",
"service": env!("CARGO_PKG_NAME"),
"version": env!("CARGO_PKG_VERSION")
}))
}
pub async fn status_handler(State(state): State<AppState>) -> impl IntoResponse {
let instances = state.instances.read().await;
let connections = state.connections.len();
Json(json!({
"instances": instances.len(),
"active_connections": connections,
"status": "operational"
}))
}
pub async fn query_handler(
State(state): State<AppState>,
Extension(account): Extension<Account>,
headers: HeaderMap,
Json(request): Json<QueryRequest>,
) -> Result<Json<ApiResponse<QueryResult>>, StatusCode> {
let start = Instant::now();
let request_id = headers
.get("x-request-id")
.and_then(|v| v.to_str().ok())
.unwrap_or("unknown")
.to_string();
let mut metadata = ResponseMetadata {
request_id: request_id.clone(),
execution_time_ms: 0,
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
let db_access = account.databases.iter()
.find(|db| db.database == request.database || db.database == "*")
.ok_or(StatusCode::FORBIDDEN)?;
if let Err(e) = validate_query_permissions(&request.query, &db_access.permissions, &account.role) {
metadata.execution_time_ms = start.elapsed().as_millis();
return Ok(Json(ApiResponse::error("PERMISSION_DENIED", e, metadata)));
}
let pool_key = format!("{}-{}-{}", account.instance_id, db_access.username, request.database);
let pool = match get_or_create_pool(&state, &pool_key, &account, db_access, &request.database).await {
Ok(p) => p,
Err(e) => {
metadata.execution_time_ms = start.elapsed().as_millis();
return Ok(Json(ApiResponse::error("CONNECTION_ERROR", e, metadata)));
}
};
match execute_query_with_pool(pool, request.query, request.params).await {
Ok(result) => {
metadata.execution_time_ms = start.elapsed().as_millis();
metadata.rows_affected = Some(result.rows.len() as u64);
Ok(Json(ApiResponse::success(result, metadata)))
}
Err(e) => {
metadata.execution_time_ms = start.elapsed().as_millis();
Ok(Json(ApiResponse::error("QUERY_ERROR", e, metadata)))
}
}
}
pub async fn batch_query_handler(
State(state): State<AppState>,
Extension(account): Extension<Account>,
headers: HeaderMap,
Json(requests): Json<Vec<QueryRequest>>,
) -> Result<Json<ApiResponse<Vec<QueryResult>>>, StatusCode> {
let start = Instant::now();
let request_id = Uuid::new_v4().to_string();
let mut results = Vec::new();
let mut total_rows = 0u64;
for request in requests {
match query_handler(
State(state.clone()),
Extension(account.clone()),
headers.clone(),
Json(request),
).await {
Ok(Json(response)) => {
if let Some(data) = response.data {
total_rows += data.rows.len() as u64;
results.push(data);
}
}
Err(_) => {
}
}
}
let metadata = ResponseMetadata {
request_id,
execution_time_ms: start.elapsed().as_millis(),
rows_affected: Some(total_rows),
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
Ok(Json(ApiResponse::success(results, metadata)))
}
pub async fn transaction_handler(
State(state): State<AppState>,
Extension(account): Extension<Account>,
headers: HeaderMap,
Json(requests): Json<Vec<QueryRequest>>,
) -> Result<Json<ApiResponse<Vec<QueryResult>>>, StatusCode> {
batch_query_handler(State(state), Extension(account), headers, Json(requests)).await
}
pub async fn list_databases(
State(state): State<AppState>,
Extension(account): Extension<Account>,
) -> Result<Json<ApiResponse<Vec<String>>>, StatusCode> {
let start = Instant::now();
let request_id = Uuid::new_v4().to_string();
let has_wildcard = account.databases.iter().any(|db| db.database == "*");
let databases: Vec<String> = if has_wildcard {
let db_access = account.databases.iter()
.find(|db| db.database == "*")
.ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
let pool_key = format!("{}-{}-postgres", account.instance_id, db_access.username);
let pool = match get_or_create_pool(&state, &pool_key, &account, db_access, "postgres").await {
Ok(p) => p,
Err(e) => {
let metadata = ResponseMetadata {
request_id,
execution_time_ms: start.elapsed().as_millis(),
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
return Ok(Json(ApiResponse::error("CONNECTION_ERROR", e, metadata)));
}
};
let query = "SELECT datname FROM pg_database WHERE datistemplate = false ORDER BY datname";
match execute_query_with_pool(pool, query.to_string(), vec![]).await {
Ok(result) => {
result.rows.into_iter()
.filter_map(|row| row.get("datname").and_then(|v| v.as_str()).map(String::from))
.collect()
}
Err(e) => {
let metadata = ResponseMetadata {
request_id,
execution_time_ms: start.elapsed().as_millis(),
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
return Ok(Json(ApiResponse::error("QUERY_ERROR", e, metadata)));
}
}
} else {
account.databases.iter()
.map(|db| db.database.clone())
.collect()
};
let metadata = ResponseMetadata {
request_id,
execution_time_ms: start.elapsed().as_millis(),
rows_affected: Some(databases.len() as u64),
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
Ok(Json(ApiResponse::success(databases, metadata)))
}
pub async fn create_database(
State(_state): State<AppState>,
Extension(account): Extension<Account>,
Json(_request): Json<serde_json::Map<String, Value>>,
) -> Result<Json<ApiResponse<Value>>, StatusCode> {
let request_id = Uuid::new_v4().to_string();
let metadata = ResponseMetadata {
request_id,
execution_time_ms: 0,
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
if account.role != AccountRole::Owner && account.role != AccountRole::Admin {
return Ok(Json(ApiResponse::error(
"PERMISSION_DENIED",
"Only Owner or Admin can create databases".to_string(),
metadata,
)));
}
Ok(Json(ApiResponse::success(json!({"created": true}), metadata)))
}
pub async fn drop_database(
State(_state): State<AppState>,
Extension(account): Extension<Account>,
Path(name): Path<String>,
) -> Result<Json<ApiResponse<Value>>, StatusCode> {
let request_id = Uuid::new_v4().to_string();
let metadata = ResponseMetadata {
request_id,
execution_time_ms: 0,
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
if account.role != AccountRole::Owner {
return Ok(Json(ApiResponse::error(
"PERMISSION_DENIED",
"Only Owner can drop databases".to_string(),
metadata,
)));
}
Ok(Json(ApiResponse::success(json!({"dropped": name}), metadata)))
}
pub async fn list_tables(
State(state): State<AppState>,
Extension(account): Extension<Account>,
Path(db): Path<String>,
) -> Result<Json<ApiResponse<Vec<Value>>>, StatusCode> {
let start = Instant::now();
let request_id = Uuid::new_v4().to_string();
if !account.databases.iter().any(|d| d.database == db || d.database == "*") {
let metadata = ResponseMetadata {
request_id,
execution_time_ms: start.elapsed().as_millis(),
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
return Ok(Json(ApiResponse::error(
"FORBIDDEN",
format!("No access to database: {db}"),
metadata,
)));
}
let query_req = QueryRequest {
query: "SELECT schemaname as schema, tablename as name, 'table' as type, tableowner as owner FROM pg_tables WHERE schemaname NOT IN ('pg_catalog', 'information_schema') ORDER BY schemaname, tablename".to_string(),
database: db,
params: vec![],
options: QueryOptions::default(),
};
match query_handler(State(state), Extension(account.clone()), HeaderMap::new(), Json(query_req)).await {
Ok(Json(response)) => {
if let Some(data) = response.data {
let metadata = ResponseMetadata {
request_id,
execution_time_ms: start.elapsed().as_millis(),
rows_affected: Some(data.rows.len() as u64),
instance_id: Some(account.instance_id),
timestamp: Utc::now(),
};
Ok(Json(ApiResponse::success(data.rows, metadata)))
} else {
Ok(Json(ApiResponse { success: false, data: None, error: response.error, metadata: response.metadata }))
}
}
Err(e) => Err(e),
}
}
pub async fn get_schema(
State(_state): State<AppState>,
Extension(account): Extension<Account>,
Path(_db): Path<String>,
) -> Result<Json<ApiResponse<Value>>, StatusCode> {
let request_id = Uuid::new_v4().to_string();
let metadata = ResponseMetadata {
request_id,
execution_time_ms: 0,
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
Ok(Json(ApiResponse::success(json!({}), metadata)))
}
pub async fn get_account_info(
State(state): State<AppState>,
Extension(account): Extension<Account>,
) -> Result<Json<ApiResponse<Value>>, StatusCode> {
let metadata = ResponseMetadata {
request_id: Uuid::new_v4().to_string(),
execution_time_ms: 0,
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
let active_connections = state.active_connections
.get(&account.id)
.map(|entry| *entry.value())
.unwrap_or(0);
let info = json!({
"id": account.id,
"name": account.name,
"role": account.role,
"databases": account.databases.iter().map(|db| db.database.clone()).collect::<Vec<_>>(),
"rate_limit": account.rate_limit,
"max_connections": account.max_connections,
"active_connections": active_connections,
});
Ok(Json(ApiResponse::success(info, metadata)))
}
pub async fn get_usage_stats(
Extension(account): Extension<Account>,
) -> Result<Json<ApiResponse<Value>>, StatusCode> {
let metadata = ResponseMetadata {
request_id: Uuid::new_v4().to_string(),
execution_time_ms: 0,
rows_affected: None,
instance_id: Some(account.instance_id.clone()),
timestamp: Utc::now(),
};
let stats = json!({
"period": Utc::now().format("%Y-%m-%d").to_string(),
"queries_today": 0,
"queries_this_month": 0,
"data_transferred_mb": 0.0,
"active_connections": 0,
"peak_connections": 0,
"errors_today": 0,
"average_query_time_ms": 0.0,
});
Ok(Json(ApiResponse::success(stats, metadata)))
}
pub async fn serve_openapi() -> impl IntoResponse {
let openapi_spec = serde_json::json!({
"openapi": "3.0.0",
"info": {
"title": "PostgreSQL API",
"version": "0.1.0",
"description": "High-performance PostgreSQL REST API driver"
},
"servers": [
{
"url": "/",
"description": "Current server"
}
],
"paths": {
"/health": {
"get": {
"summary": "Health check",
"responses": {
"200": {
"description": "Service is healthy"
}
}
}
},
"/v1/query": {
"post": {
"summary": "Execute a SQL query",
"security": [{"bearerAuth": []}],
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/QueryRequest"
}
}
}
},
"responses": {
"200": {
"description": "Query executed successfully"
}
}
}
}
},
"components": {
"schemas": {
"QueryRequest": {
"type": "object",
"required": ["query"],
"properties": {
"query": {
"type": "string",
"description": "SQL query to execute"
},
"params": {
"type": "array",
"items": {},
"description": "Query parameters"
},
"database": {
"type": "string",
"description": "Target database"
}
}
}
},
"securitySchemes": {
"bearerAuth": {
"type": "http",
"scheme": "bearer"
}
}
}
});
Json(openapi_spec)
}
pub async fn serve_docs() -> impl IntoResponse {
Html(r#"<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>pg-api Documentation</title>
<link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9/swagger-ui.css">
<style>
body {
margin: 0;
padding: 0;
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
}
.header {
background-color: #1a1a1a;
color: white;
padding: 1rem 2rem;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.header h1 {
margin: 0;
font-size: 1.5rem;
font-weight: 500;
}
.header p {
margin: 0.5rem 0 0 0;
opacity: 0.8;
font-size: 0.9rem;
}
#swagger-ui {
margin-top: 0;
}
.swagger-ui .topbar {
display: none;
}
</style>
</head>
<body>
<div class="header">
<h1>pg-api Documentation</h1>
<p>PostgreSQL API Service - RESTful API for database operations</p>
</div>
<div id="swagger-ui"></div>
<script src="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9/swagger-ui-bundle.js"></script>
<script src="https://cdn.jsdelivr.net/npm/swagger-ui-dist@5.9/swagger-ui-standalone-preset.js"></script>
<script>
window.onload = function() {
window.ui = SwaggerUIBundle({
url: "/openapi.json",
dom_id: '#swagger-ui',
deepLinking: true,
presets: [
SwaggerUIBundle.presets.apis,
SwaggerUIStandalonePreset
],
plugins: [
SwaggerUIBundle.plugins.DownloadUrl
],
layout: "StandaloneLayout",
persistAuthorization: true,
tryItOutEnabled: true,
supportedSubmitMethods: ['get', 'post', 'put', 'delete', 'patch'],
onComplete: function() {
console.log("Swagger UI loaded");
}
});
};
</script>
</body>
</html>"#)
}