use crate::error::AppError;
use crate::extractors::tenant::TenantId;
use crate::handlers::entity::resolve_tenant_context;
use crate::response::success_one_ok;
use crate::state::AppState;
use crate::store::qualified_sys_table;
use axum::extract::{Path, State};
use axum::Json;
use serde_json::Value;
async fn validate_namespace(
pool: &sqlx::PgPool,
package_id: &str,
namespace: &str,
) -> Result<(), AppError> {
let q = qualified_sys_table("_sys_kv_stores");
let exists: Option<(String,)> = sqlx::query_as(&format!(
"SELECT id FROM {} WHERE id = $1 AND package_id = $2",
q
))
.bind(namespace)
.bind(package_id)
.fetch_optional(pool)
.await?;
exists.ok_or_else(|| {
AppError::NotFound(format!(
"kv namespace '{}' not found in package '{}'",
namespace, package_id
))
})?;
Ok(())
}
pub async fn kv_list_keys(
TenantId(tenant_id_opt): TenantId,
State(state): State<AppState>,
Path((package_id, namespace)): Path<(String, String)>,
) -> Result<impl axum::response::IntoResponse, AppError> {
let tenant_id = tenant_id_opt
.as_deref()
.filter(|s| !s.is_empty())
.ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
state
.tenant_registry
.get(tenant_id)
.ok_or_else(|| AppError::NotFound(format!("tenant not found: {}", tenant_id)))?;
let _ctx = resolve_tenant_context(&state, Some(tenant_id), Some(&package_id)).await?;
let pool = &state.pool;
validate_namespace(pool, &package_id, &namespace).await?;
let q_table = qualified_sys_table("_sys_kv_data");
let sql = format!(
"SELECT key, value FROM {} WHERE tenant_id = $1 AND package_id = $2 AND namespace = $3 ORDER BY key",
q_table
);
let rows: Vec<(String, Value)> = sqlx::query_as(&sql)
.bind(tenant_id)
.bind(&package_id)
.bind(&namespace)
.fetch_all(pool)
.await?;
let data: Vec<serde_json::Value> = rows
.into_iter()
.map(|(k, v)| serde_json::json!({ "key": k, "value": v }))
.collect();
Ok(crate::response::success_many(data))
}
pub async fn kv_get(
TenantId(tenant_id_opt): TenantId,
State(state): State<AppState>,
Path((package_id, namespace, key)): Path<(String, String, String)>,
) -> Result<impl axum::response::IntoResponse, AppError> {
let tenant_id = tenant_id_opt
.as_deref()
.filter(|s| !s.is_empty())
.ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
state
.tenant_registry
.get(tenant_id)
.ok_or_else(|| AppError::NotFound(format!("tenant not found: {}", tenant_id)))?;
let _ctx = resolve_tenant_context(&state, Some(tenant_id), Some(&package_id)).await?;
let pool = &state.pool;
validate_namespace(pool, &package_id, &namespace).await?;
let q_table = qualified_sys_table("_sys_kv_data");
let sql = format!(
"SELECT value FROM {} WHERE tenant_id = $1 AND package_id = $2 AND namespace = $3 AND key = $4",
q_table
);
let row: Option<(Value,)> = sqlx::query_as(&sql)
.bind(tenant_id)
.bind(&package_id)
.bind(&namespace)
.bind(&key)
.fetch_optional(pool)
.await?;
let value = row
.ok_or_else(|| AppError::NotFound(format!("kv key not found: {} / {}", namespace, key)))?
.0;
Ok(success_one_ok(value))
}
pub async fn kv_put(
TenantId(tenant_id_opt): TenantId,
State(state): State<AppState>,
Path((package_id, namespace, key)): Path<(String, String, String)>,
Json(body): Json<Value>,
) -> Result<impl axum::response::IntoResponse, AppError> {
let tenant_id = tenant_id_opt
.as_deref()
.filter(|s| !s.is_empty())
.ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
state
.tenant_registry
.get(tenant_id)
.ok_or_else(|| AppError::NotFound(format!("tenant not found: {}", tenant_id)))?;
let _ctx = resolve_tenant_context(&state, Some(tenant_id), Some(&package_id)).await?;
let pool = &state.pool;
validate_namespace(pool, &package_id, &namespace).await?;
let q_table = qualified_sys_table("_sys_kv_data");
let sql = format!(
r#"
INSERT INTO {} (tenant_id, package_id, namespace, key, value, updated_at)
VALUES ($1, $2, $3, $4, $5, NOW())
ON CONFLICT (tenant_id, package_id, namespace, key)
DO UPDATE SET value = $5, updated_at = NOW()
"#,
q_table
);
sqlx::query(&sql)
.bind(tenant_id)
.bind(&package_id)
.bind(&namespace)
.bind(&key)
.bind(&body)
.execute(pool)
.await?;
Ok(success_one_ok(body))
}
pub async fn kv_delete(
TenantId(tenant_id_opt): TenantId,
State(state): State<AppState>,
Path((package_id, namespace, key)): Path<(String, String, String)>,
) -> Result<impl axum::response::IntoResponse, AppError> {
let tenant_id = tenant_id_opt
.as_deref()
.filter(|s| !s.is_empty())
.ok_or_else(|| AppError::BadRequest("X-Tenant-ID header is required".into()))?;
state
.tenant_registry
.get(tenant_id)
.ok_or_else(|| AppError::NotFound(format!("tenant not found: {}", tenant_id)))?;
let _ctx = resolve_tenant_context(&state, Some(tenant_id), Some(&package_id)).await?;
let pool = &state.pool;
validate_namespace(pool, &package_id, &namespace).await?;
let q_table = qualified_sys_table("_sys_kv_data");
let sql = format!(
"DELETE FROM {} WHERE tenant_id = $1 AND package_id = $2 AND namespace = $3 AND key = $4",
q_table
);
let result: sqlx::postgres::PgQueryResult = sqlx::query(&sql)
.bind(tenant_id)
.bind(&package_id)
.bind(&namespace)
.bind(&key)
.execute(pool)
.await?;
if result.rows_affected() == 0 {
return Err(AppError::NotFound(format!(
"kv key not found: {} / {}",
namespace, key
)));
}
Ok((axum::http::StatusCode::NO_CONTENT, ()))
}