use std::sync::Arc;
use std::time::Instant;
use axum::body::Body;
use axum::extract::{Path, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::Response;
use serde::Deserialize;
use crate::common::{AppState, check_auth, check_auth_write, redacted_error};
async fn read_body(body: Body) -> Result<String, Response> {
let bytes = match axum::body::to_bytes(body, 10 * 1024 * 1024).await {
Ok(b) => b,
Err(_) => {
return Err(json_response(
StatusCode::PAYLOAD_TOO_LARGE,
serde_json::json!({
"error": "PT404",
"message": "request body exceeds maximum allowed size (10 MiB)"
}),
));
}
};
Ok(String::from_utf8_lossy(&bytes).into_owned())
}
fn json_response(status: StatusCode, body: serde_json::Value) -> Response {
Response::builder()
.status(status)
.header("content-type", "application/json")
.body(Body::from(body.to_string()))
.expect("infallible: hardcoded valid HTTP headers")
}
pub async fn load_rules(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let rule_text = match read_body(body).await {
Ok(t) => t,
Err(r) => return r,
};
if rule_text.trim().is_empty() {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_request", "detail": "empty rule body"}),
);
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one(
"SELECT pg_ripple.load_rules($1, $2)",
&[&rule_text, &rule_set],
)
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let rules_loaded: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(
StatusCode::OK,
serde_json::json!({"rule_set": rule_set, "rules_loaded": rules_loaded}),
)
}
pub async fn load_builtin(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.load_rules_builtin($1)", &[&rule_set])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let rules_loaded: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(
StatusCode::OK,
serde_json::json!({"rule_set": rule_set, "rules_loaded": rules_loaded}),
)
}
pub async fn list_rules(State(state): State<Arc<AppState>>, headers: HeaderMap) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client.query_one("SELECT pg_ripple.list_rules()", &[]).await {
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"datalog_error",
&format!("list_rules failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
pub async fn drop_rules(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.drop_rules($1)", &[&rule_set])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let deleted: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, serde_json::json!({"deleted": deleted}))
}
pub async fn add_rule(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let rule_text = match read_body(body).await {
Ok(t) => t,
Err(r) => return r,
};
if rule_text.trim().is_empty() {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_request", "detail": "empty rule body"}),
);
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one(
"SELECT pg_ripple.add_rule($1, $2)",
&[&rule_set, &rule_text],
)
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let rule_id: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(
StatusCode::OK,
serde_json::json!({"rule_set": rule_set, "rule_id": rule_id}),
)
}
pub async fn remove_rule(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path((_rule_set, rule_id_str)): Path<(String, String)>,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let rule_id: i64 = match rule_id_str.parse() {
Ok(id) => id,
Err(_) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({
"error": "invalid_request",
"detail": "rule_id must be a non-negative integer"
}),
);
}
};
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.remove_rule($1)", &[&rule_id])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let removed: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, serde_json::json!({"removed": removed}))
}
pub async fn enable_rule_set(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
rule_set_toggle(&state, &rule_set, true).await
}
pub async fn disable_rule_set(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
rule_set_toggle(&state, &rule_set, false).await
}
async fn rule_set_toggle(state: &AppState, rule_set: &str, enable: bool) -> Response {
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let sql = if enable {
"SELECT pg_ripple.enable_rule_set($1)"
} else {
"SELECT pg_ripple.disable_rule_set($1)"
};
if let Err(e) = client.execute(sql, &[&rule_set]).await {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
state.metrics.record_datalog_query(start.elapsed());
json_response(
StatusCode::OK,
serde_json::json!({"rule_set": rule_set, "enabled": enable}),
)
}
pub async fn infer(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.infer($1)", &[&rule_set])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let derived: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, serde_json::json!({"derived": derived}))
}
pub async fn infer_with_stats(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.infer_with_stats($1)", &[&rule_set])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
pub async fn infer_agg(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.infer_agg($1)", &[&rule_set])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let derived: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, serde_json::json!({"derived": derived}))
}
pub async fn infer_wfs(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.infer_wfs($1)", &[&rule_set])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let derived: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, serde_json::json!({"derived": derived}))
}
pub async fn infer_demand(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
body: Body,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let body_str = match read_body(body).await {
Ok(t) => t,
Err(r) => return r,
};
let demands_json: serde_json::Value = match serde_json::from_str(&body_str) {
Ok(v) => v,
Err(e) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_request", "detail": format!("invalid JSON body: {e}")}),
);
}
};
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one(
"SELECT pg_ripple.infer_demand($1, $2::jsonb)",
&[&rule_set, &demands_json],
)
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
#[derive(Deserialize)]
pub struct LatticeBody {
pub lattice: String,
}
pub async fn infer_lattice(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
body: Body,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let body_str = match read_body(body).await {
Ok(t) => t,
Err(r) => return r,
};
let req: LatticeBody = match serde_json::from_str(&body_str) {
Ok(v) => v,
Err(e) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_request", "detail": format!("expected {{\"lattice\": \"…\"}}: {e}")}),
);
}
};
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one(
"SELECT pg_ripple.infer_lattice($1, $2)",
&[&rule_set, &req.lattice],
)
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let derived: i64 = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, serde_json::json!({"derived": derived}))
}
pub async fn query_goal(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
body: Body,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let goal_text = match read_body(body).await {
Ok(t) => t,
Err(r) => return r,
};
if goal_text.trim().is_empty() {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_request", "detail": "empty goal body"}),
);
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one(
"SELECT pg_ripple.infer_goal($1, $2)",
&[&rule_set, &goal_text],
)
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_goal_error(&msg);
return redacted_error(cat, &msg, status);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
pub async fn check_constraints_all(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
check_constraints_inner(&state, None).await
}
pub async fn check_constraints(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(rule_set): Path<String>,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
check_constraints_inner(&state, Some(&rule_set)).await
}
async fn check_constraints_inner(state: &AppState, rule_set: Option<&str>) -> Response {
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.check_constraints($1)", &[&rule_set])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"datalog_error",
&format!("check_constraints failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
pub async fn cache_stats(State(state): State<Arc<AppState>>, headers: HeaderMap) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.rule_plan_cache_stats()", &[])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"datalog_error",
&format!("rule_plan_cache_stats failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
pub async fn tabling_stats(State(state): State<Arc<AppState>>, headers: HeaderMap) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.tabling_stats()", &[])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"datalog_error",
&format!("tabling_stats failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
pub async fn list_lattices(State(state): State<Arc<AppState>>, headers: HeaderMap) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.list_lattices()", &[])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"datalog_error",
&format!("list_lattices failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
#[derive(Deserialize)]
pub struct CreateLatticeBody {
pub name: String,
pub join_fn: String,
pub bottom: String,
}
pub async fn create_lattice(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let body_str = match read_body(body).await {
Ok(t) => t,
Err(r) => return r,
};
let req: CreateLatticeBody = match serde_json::from_str(&body_str) {
Ok(v) => v,
Err(e) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_request", "detail": format!("expected {{\"name\", \"join_fn\", \"bottom\"}}: {e}")}),
);
}
};
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
if let Err(e) = client
.execute(
"SELECT pg_ripple.create_lattice($1, $2, $3)",
&[&req.name, &req.join_fn, &req.bottom],
)
.await
{
state.metrics.record_error();
return redacted_error(
"datalog_error",
&format!("create_lattice failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
state.metrics.record_datalog_query(start.elapsed());
json_response(
StatusCode::CREATED,
serde_json::json!({"created": req.name}),
)
}
pub async fn list_views(State(state): State<Arc<AppState>>, headers: HeaderMap) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let row = match client
.query_one("SELECT pg_ripple.list_datalog_views()", &[])
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"datalog_error",
&format!("list_datalog_views failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let result: serde_json::Value = row.get(0);
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, result)
}
#[derive(Deserialize)]
pub struct CreateViewBody {
pub name: String,
pub rules: Option<String>,
pub goal: String,
pub rule_set: Option<String>,
pub schedule: Option<String>,
pub decode: Option<bool>,
}
pub async fn create_view(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let body_str = match read_body(body).await {
Ok(t) => t,
Err(r) => return r,
};
let req: CreateViewBody = match serde_json::from_str(&body_str) {
Ok(v) => v,
Err(e) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_request", "detail": format!("invalid view definition: {e}")}),
);
}
};
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
if let Err(e) = client
.execute(
"SELECT pg_ripple.create_datalog_view($1, $2, $3, $4, $5, $6)",
&[
&req.name,
&req.rules,
&req.goal,
&req.rule_set,
&req.schedule,
&req.decode,
],
)
.await
{
state.metrics.record_error();
return redacted_error(
"datalog_error",
&format!("create_datalog_view failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
state.metrics.record_datalog_query(start.elapsed());
json_response(
StatusCode::CREATED,
serde_json::json!({"created": req.name}),
)
}
pub async fn drop_view(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Path(name): Path<String>,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
if let Err(e) = client
.execute("SELECT pg_ripple.drop_datalog_view($1)", &[&name])
.await
{
state.metrics.record_error();
let msg = e.to_string();
let (cat, status) = classify_pg_error(&msg);
return redacted_error(cat, &msg, status);
}
state.metrics.record_datalog_query(start.elapsed());
json_response(StatusCode::OK, serde_json::json!({"dropped": name}))
}
fn classify_pg_error(msg: &str) -> (&'static str, StatusCode) {
let lower = msg.to_lowercase();
if lower.contains("parse") || lower.contains("syntax") || lower.contains("invalid rule") {
("datalog_parse_error", StatusCode::BAD_REQUEST)
} else if lower.contains("does not exist") || lower.contains("not found") {
("rule_set_not_found", StatusCode::NOT_FOUND)
} else {
("datalog_error", StatusCode::INTERNAL_SERVER_ERROR)
}
}
fn classify_pg_goal_error(msg: &str) -> (&'static str, StatusCode) {
let lower = msg.to_lowercase();
if lower.contains("parse") || lower.contains("syntax") || lower.contains("invalid goal") {
("datalog_goal_error", StatusCode::BAD_REQUEST)
} else if lower.contains("does not exist") || lower.contains("not found") {
("rule_set_not_found", StatusCode::NOT_FOUND)
} else {
("datalog_error", StatusCode::INTERNAL_SERVER_ERROR)
}
}