use std::sync::Arc;
use std::time::Instant;
use axum::body::Body;
use axum::extract::{Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::Response;
use serde::Deserialize;
use super::sparql_handlers::json_response_http;
use crate::common::{AppState, check_auth, check_auth_write, redacted_error};
fn json_response(status: StatusCode, body: serde_json::Value) -> Response {
json_response_http(status, body)
}
async fn read_body(body: Body) -> Result<String, Response> {
let bytes = match axum::body::to_bytes(body, 64 * 1024 * 1024).await {
Ok(b) => b,
Err(e) => {
return Err(json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "read_error", "detail": format!("{e}")}),
));
}
};
String::from_utf8(bytes.to_vec()).map_err(|_| {
json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_utf8", "detail": "request body is not valid UTF-8"}),
)
})
}
#[derive(Debug, Deserialize)]
pub struct LoadConfidenceParams {
#[serde(default = "default_confidence")]
pub confidence: f64,
#[serde(default = "default_format")]
pub format: String,
pub graph_uri: Option<String>,
}
fn default_confidence() -> f64 {
1.0
}
fn default_format() -> String {
"ntriples".to_owned()
}
#[derive(Debug, Deserialize)]
pub struct ShaclParams {
pub graph: String,
}
pub(crate) async fn load_with_confidence(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Query(params): Query<LoadConfidenceParams>,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let data = match read_body(body).await {
Ok(d) => d,
Err(r) => return r,
};
if !(0.0..=1.0).contains(¶ms.confidence) {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({
"error": "invalid_confidence",
"detail": "confidence must be in [0.0, 1.0]"
}),
);
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let sql = "SELECT pg_ripple.load_triples_with_confidence($1, $2, $3, $4)";
let rows = client
.query(
sql,
&[
&data,
¶ms.confidence,
¶ms.format.as_str(),
¶ms.graph_uri.as_deref(),
],
)
.await;
match rows {
Ok(rows) => {
let n: i64 = rows.first().and_then(|r| r.try_get(0).ok()).unwrap_or(0);
json_response(
StatusCode::OK,
serde_json::json!({
"triples_loaded": n,
"confidence": params.confidence,
"elapsed_ms": start.elapsed().as_millis()
}),
)
}
Err(e) => {
state.metrics.record_error();
redacted_error(
"load_error",
&format!("load_triples_with_confidence failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub(crate) async fn shacl_score(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Query(params): Query<ShaclParams>,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
match client
.query_one(
"SELECT pg_ripple.shacl_score($1)",
&[¶ms.graph.as_str()],
)
.await
{
Ok(row) => {
let score: f64 = row.try_get(0).unwrap_or(1.0);
json_response(
StatusCode::OK,
serde_json::json!({
"graph": params.graph,
"score": score,
"elapsed_ms": start.elapsed().as_millis()
}),
)
}
Err(e) => {
state.metrics.record_error();
redacted_error(
"shacl_score_error",
&format!("shacl_score failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub(crate) async fn shacl_report_scored(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Query(params): Query<ShaclParams>,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
match client
.query(
"SELECT focus_node, shape_iri, result_severity, result_severity_score, message \
FROM pg_ripple.shacl_report_scored($1)",
&[¶ms.graph.as_str()],
)
.await
{
Ok(rows) => {
let violations: Vec<serde_json::Value> = rows
.iter()
.map(|row| {
serde_json::json!({
"focusNode": row.try_get::<_, &str>(0).unwrap_or(""),
"shapeIRI": row.try_get::<_, &str>(1).unwrap_or(""),
"severity": row.try_get::<_, &str>(2).unwrap_or(""),
"score": row.try_get::<_, f64>(3).unwrap_or(1.0),
"message": row.try_get::<_, &str>(4).unwrap_or("")
})
})
.collect();
json_response(
StatusCode::OK,
serde_json::json!({
"graph": params.graph,
"violations": violations,
"elapsed_ms": start.elapsed().as_millis()
}),
)
}
Err(e) => {
state.metrics.record_error();
redacted_error(
"shacl_report_error",
&format!("shacl_report_scored failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub(crate) async fn vacuum_confidence(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let start = Instant::now();
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"service_unavailable",
&format!("pool error: {e}"),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
match client
.query_one("SELECT pg_ripple.vacuum_confidence()", &[])
.await
{
Ok(row) => {
let deleted: i64 = row.try_get(0).unwrap_or(0);
json_response(
StatusCode::OK,
serde_json::json!({
"deleted": deleted,
"elapsed_ms": start.elapsed().as_millis()
}),
)
}
Err(e) => {
state.metrics.record_error();
redacted_error(
"vacuum_confidence_error",
&format!("vacuum_confidence failed: {e}"),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}