use std::sync::Arc;
use std::time::Instant;
use axum::body::Body;
use axum::extract::State;
use axum::http::{HeaderMap, StatusCode};
use axum::response::Response;
use serde::Deserialize;
use super::sparql_handlers::json_response_http;
use crate::common::{AppState, check_auth_write, redacted_error};
fn json_response(status: StatusCode, body: serde_json::Value) -> Response {
json_response_http(status, body)
}
#[derive(Debug, Deserialize)]
pub struct ResolveEntitiesBody {
pub source_graph: String,
pub target_graph: String,
pub options: Option<serde_json::Value>,
}
#[derive(Debug, Deserialize)]
pub struct EvaluateResolutionBody {
pub gold_graph: String,
pub pipeline_options: Option<serde_json::Value>,
}
pub(crate) async fn entity_resolution_resolve(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let bytes = match axum::body::to_bytes(body, 1024 * 1024).await {
Ok(b) => b,
Err(_) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "read_error"}),
);
}
};
let req: ResolveEntitiesBody = match serde_json::from_slice(&bytes) {
Ok(r) => r,
Err(e) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_json", "detail": format!("{e}")}),
);
}
};
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"db_pool_error",
&e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let start = Instant::now();
let options_json = req
.options
.map(|v| v.to_string())
.unwrap_or_else(|| "null".to_owned());
let row = match client
.query_one(
"SELECT pg_ripple.resolve_entities($1, $2, $3::json)",
&[&req.source_graph, &req.target_graph, &options_json],
)
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"entity_resolution_error",
&e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let elapsed = start.elapsed();
state
.metrics
.record_er_stage_duration("canonicalization", elapsed);
let result: serde_json::Value = row.get::<_, serde_json::Value>(0);
json_response(StatusCode::OK, result)
}
pub(crate) async fn entity_resolution_evaluate(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let bytes = match axum::body::to_bytes(body, 1024 * 1024).await {
Ok(b) => b,
Err(_) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "read_error"}),
);
}
};
let req: EvaluateResolutionBody = match serde_json::from_slice(&bytes) {
Ok(r) => r,
Err(e) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_json", "detail": format!("{e}")}),
);
}
};
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"db_pool_error",
&e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let opts_json = req
.pipeline_options
.map(|v| v.to_string())
.unwrap_or_else(|| "{}".to_owned());
let row = match client
.query_one(
"SELECT pg_ripple.evaluate_resolution($1, $2::jsonb)",
&[&req.gold_graph, &opts_json],
)
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"evaluate_resolution_error",
&e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let result: serde_json::Value = row.get::<_, serde_json::Value>(0);
json_response(StatusCode::OK, result)
}
pub(crate) async fn entity_resolution_monitoring_enable(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"db_pool_error",
&e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
match client
.execute("SELECT pg_ripple.enable_er_monitoring()", &[])
.await
{
Ok(_) => json_response(
StatusCode::OK,
serde_json::json!({"status": "monitoring_enabled"}),
),
Err(e) => {
state.metrics.record_error();
redacted_error(
"er_monitoring_error",
&e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub(crate) async fn entity_resolution_monitoring_disable(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"db_pool_error",
&e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
match client
.execute("SELECT pg_ripple.disable_er_monitoring()", &[])
.await
{
Ok(_) => json_response(
StatusCode::OK,
serde_json::json!({"status": "monitoring_disabled"}),
),
Err(e) => {
state.metrics.record_error();
redacted_error(
"er_monitoring_error",
&e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}