use std::sync::Arc;
use axum::body::Body;
use axum::extract::{Query, State};
use axum::http::{HeaderMap, StatusCode};
use axum::response::Response;
use serde::Deserialize;
use super::sparql_handlers::json_response_http;
use crate::common::{AppState, check_auth, check_auth_write, redacted_error};
fn json_response(status: StatusCode, body: serde_json::Value) -> Response {
json_response_http(status, body)
}
#[derive(Debug, Deserialize)]
pub struct MarkTemporalBody {
pub predicate_iri: String,
#[serde(default = "default_data_model")]
pub data_model: String,
}
fn default_data_model() -> String {
"snapshot".to_owned()
}
#[derive(Debug, Deserialize)]
pub struct PointInTimeBody {
pub timestamp: Option<String>,
}
#[derive(Debug, Deserialize)]
pub struct TemporalFactsParams {
pub predicate_iri: Option<String>,
pub subject_iri: Option<String>,
}
pub(crate) async fn list_temporal_predicates(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"db_pool_error",
&e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let rows = match client
.query(
"SELECT predicate_iri, data_model \
FROM _pg_ripple.temporal_predicates \
ORDER BY predicate_iri",
&[],
)
.await
{
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"query_error",
&e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let predicates: Vec<serde_json::Value> = rows
.iter()
.map(|row| {
serde_json::json!({
"predicate_iri": row.get::<_, String>(0),
"data_model": row.get::<_, String>(1),
})
})
.collect();
json_response(
StatusCode::OK,
serde_json::json!({ "predicates": predicates }),
)
}
pub(crate) async fn mark_temporal_predicate(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let bytes = match axum::body::to_bytes(body, 1024 * 1024).await {
Ok(b) => b,
Err(_) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "read_error"}),
);
}
};
let req: MarkTemporalBody = match serde_json::from_slice(&bytes) {
Ok(r) => r,
Err(e) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_json", "detail": format!("{e}")}),
);
}
};
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"db_pool_error",
&e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
match client
.execute(
"SELECT pg_ripple.mark_temporal($1, $2)",
&[&req.predicate_iri, &req.data_model],
)
.await
{
Ok(_) => json_response(
StatusCode::OK,
serde_json::json!({
"status": "ok",
"predicate_iri": req.predicate_iri,
"data_model": req.data_model,
}),
),
Err(e) => {
state.metrics.record_error();
redacted_error(
"mark_temporal_error",
&e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub(crate) async fn set_point_in_time(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
body: Body,
) -> Response {
if let Err(r) = check_auth_write(&state, &headers) {
return r;
}
let bytes = match axum::body::to_bytes(body, 64 * 1024).await {
Ok(b) => b,
Err(_) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "read_error"}),
);
}
};
let req: PointInTimeBody = if bytes.is_empty() {
PointInTimeBody { timestamp: None }
} else {
match serde_json::from_slice(&bytes) {
Ok(r) => r,
Err(e) => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "invalid_json", "detail": format!("{e}")}),
);
}
}
};
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"db_pool_error",
&e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let result = if let Some(ts) = &req.timestamp {
client
.execute("SELECT pg_ripple.point_in_time($1::TIMESTAMPTZ)", &[ts])
.await
} else {
client
.execute("SELECT pg_ripple.clear_point_in_time()", &[])
.await
};
match result {
Ok(_) => json_response(
StatusCode::OK,
serde_json::json!({ "status": "ok", "timestamp": req.timestamp }),
),
Err(e) => {
state.metrics.record_error();
redacted_error(
"point_in_time_error",
&e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
)
}
}
}
pub(crate) async fn temporal_facts(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
Query(params): Query<TemporalFactsParams>,
) -> Response {
if let Err(r) = check_auth(&state, &headers) {
return r;
}
state.metrics.record_temporal_query();
let predicate_iri = match ¶ms.predicate_iri {
Some(p) => p.clone(),
None => {
return json_response(
StatusCode::BAD_REQUEST,
serde_json::json!({"error": "missing_param", "detail": "predicate_iri is required"}),
);
}
};
let client = match state.pool.get().await {
Ok(c) => c,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"db_pool_error",
&e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
);
}
};
let (sql, rows_result) = if let Some(subject_iri) = ¶ms.subject_iri {
let s = "SELECT s_value, p_value, o_value, valid_from::TEXT, valid_to::TEXT \
FROM pg_ripple.temporal_window($1) \
WHERE s_value = $2 \
ORDER BY valid_from";
(s, client.query(s, &[&predicate_iri, subject_iri]).await)
} else {
let s = "SELECT s_value, p_value, o_value, valid_from::TEXT, valid_to::TEXT \
FROM pg_ripple.temporal_window($1) \
ORDER BY valid_from";
(s, client.query(s, &[&predicate_iri]).await)
};
let _ = sql; let rows = match rows_result {
Ok(r) => r,
Err(e) => {
state.metrics.record_error();
return redacted_error(
"temporal_facts_error",
&e.to_string(),
StatusCode::INTERNAL_SERVER_ERROR,
);
}
};
let facts: Vec<serde_json::Value> = rows
.iter()
.map(|row| {
serde_json::json!({
"subject": row.get::<_, String>(0),
"predicate": row.get::<_, String>(1),
"object": row.get::<_, String>(2),
"valid_from": row.get::<_, Option<String>>(3),
"valid_to": row.get::<_, Option<String>>(4),
})
})
.collect();
json_response(
StatusCode::OK,
serde_json::json!({ "facts": facts, "count": facts.len() }),
)
}