pub use crate::handlers::sparql::content_types::*;
pub use crate::handlers::sparql::core::*;
pub use crate::handlers::sparql::optimizers::*;
pub use crate::handlers::sparql::aggregation_engine::{
AggregationFunction, EnhancedAggregationProcessor,
};
pub use crate::handlers::sparql::bind_processor::{EnhancedBindProcessor, EnhancedValuesProcessor};
pub use crate::handlers::sparql::service_delegation::{
ParallelServiceExecutor, ServiceDelegationManager, ServiceResultMerger,
};
pub use crate::handlers::sparql::sparql12_features::{AggregationEngine, Sparql12Features};
use crate::{
auth::AuthUser,
error::{FusekiError, FusekiResult},
server::AppState,
};
use axum::{
extract::{Query, State},
http::HeaderMap,
response::IntoResponse,
Form,
};
use std::sync::Arc;
pub async fn query_handler_get(
Query(params): Query<SparqlQueryParams>,
State(state): State<Arc<AppState>>,
headers: HeaderMap,
user: Option<AuthUser>,
) -> impl IntoResponse {
crate::handlers::sparql::core::sparql_query(Query(params), State(state), headers, user).await
}
pub async fn query_handler_post(
State(state): State<Arc<AppState>>,
headers: HeaderMap,
user: Option<AuthUser>,
body: axum::body::Bytes,
) -> impl IntoResponse {
let content_type = headers
.get("content-type")
.and_then(|h| h.to_str().ok())
.unwrap_or("");
tracing::debug!("POST /sparql request with content-type: {}", content_type);
let params = if content_type.contains("application/x-www-form-urlencoded") {
let body_str = String::from_utf8_lossy(&body);
let mut query = None;
let mut default_graph_uri = None;
let mut named_graph_uri = None;
for part in body_str.split('&') {
if let Some((key, value)) = part.split_once('=') {
let decoded_value = urlencoding::decode(value).unwrap_or_default().to_string();
match key {
"query" => query = Some(decoded_value),
"default-graph-uri" => {
default_graph_uri = Some(vec![decoded_value]);
}
"named-graph-uri" => {
named_graph_uri = Some(vec![decoded_value]);
}
_ => {}
}
}
}
SparqlQueryParams {
query,
default_graph_uri,
named_graph_uri,
timeout: None,
format: None,
}
} else if content_type.contains("application/sparql-query") {
let query_string = String::from_utf8_lossy(&body).to_string();
SparqlQueryParams {
query: Some(query_string),
default_graph_uri: None,
named_graph_uri: None,
timeout: None,
format: None,
}
} else {
tracing::debug!("Invalid content type detected: {}", content_type);
return FusekiError::bad_request(format!(
"Unsupported content type: {content_type}. Expected 'application/sparql-query' or 'application/x-www-form-urlencoded'"
)).into_response();
};
crate::handlers::sparql::core::sparql_query(Query(params), State(state), headers, user)
.await
.into_response()
}
pub async fn query_handler(
Query(params): Query<SparqlQueryParams>,
State(state): State<Arc<AppState>>,
headers: HeaderMap,
user: Option<AuthUser>,
) -> impl IntoResponse {
crate::handlers::sparql::core::sparql_query(Query(params), State(state), headers, user).await
}
pub async fn update_handler(
State(state): State<Arc<AppState>>,
Form(params): Form<SparqlUpdateParams>,
) -> impl IntoResponse {
use axum::Json;
let context = QueryContext::default();
match execute_sparql_update(¶ms.update, context, &state).await {
Ok(result) => Json(serde_json::json!({
"success": true,
"message": "Update executed successfully",
"modified_count": result.affected_triples.unwrap_or(0)
}))
.into_response(),
Err(e) => (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
Json(serde_json::json!({
"error": "update_execution_failed",
"message": e.to_string()
})),
)
.into_response(),
}
}
#[derive(Debug, Clone)]
pub struct EnhancedSparqlService {
pub sparql12_features: Sparql12Features,
pub service_delegator: ServiceDelegationManager,
pub aggregation_processor: EnhancedAggregationProcessor,
pub bind_processor: EnhancedBindProcessor,
pub values_processor: EnhancedValuesProcessor,
pub content_negotiator: ContentNegotiator,
pub injection_detector: InjectionDetector,
pub complexity_analyzer: ComplexityAnalyzer,
pub performance_optimizer: PerformanceOptimizer,
}
impl EnhancedSparqlService {
pub fn new() -> Self {
Self {
sparql12_features: Sparql12Features::new(),
service_delegator: ServiceDelegationManager::new(),
aggregation_processor: EnhancedAggregationProcessor::new(),
bind_processor: EnhancedBindProcessor::new(),
values_processor: EnhancedValuesProcessor::new(),
content_negotiator: ContentNegotiator::new(),
injection_detector: InjectionDetector::new(),
complexity_analyzer: ComplexityAnalyzer::new(),
performance_optimizer: PerformanceOptimizer::new(),
}
}
pub async fn process_query(
&mut self,
query: &str,
_context: QueryContext,
_headers: &HeaderMap,
) -> FusekiResult<String> {
if self.injection_detector.detect_injection(query)? {
return Err(FusekiError::authorization(
"Potential SPARQL injection detected",
));
}
let complexity = self.complexity_analyzer.analyze_complexity(query)?;
if complexity.is_complex {
tracing::warn!("Complex query detected: {:?}", complexity);
}
let mut optimized_query = query.to_string();
optimized_query = self
.sparql12_features
.optimize_query(&optimized_query)
.await?;
optimized_query = self.performance_optimizer.optimize(&optimized_query)?;
if optimized_query.to_uppercase().contains("SERVICE") {
optimized_query = self
.service_delegator
.process_service_clauses(&optimized_query)
.await?;
}
if contains_aggregation_functions(&optimized_query) {
optimized_query = self
.aggregation_processor
.process_aggregations(&optimized_query)?;
}
if optimized_query.to_uppercase().contains("BIND") {
optimized_query = self.bind_processor.process_bind_clauses(&optimized_query)?;
}
if optimized_query.to_uppercase().contains("VALUES") {
optimized_query = self
.values_processor
.process_values_clauses(&optimized_query)?;
}
Ok(optimized_query)
}
pub async fn process_update(
&mut self,
update: &str,
_context: QueryContext,
) -> FusekiResult<String> {
if self.injection_detector.detect_injection(update)? {
return Err(FusekiError::authorization(
"Potential SPARQL injection detected in update",
));
}
let optimized_update = self.performance_optimizer.optimize(update)?;
Ok(optimized_update)
}
pub fn negotiate_content_type(&self, headers: &HeaderMap) -> String {
self.content_negotiator.negotiate(headers)
}
pub fn format_response<T: serde::Serialize + std::fmt::Debug>(
&self,
data: &T,
content_type: &str,
) -> FusekiResult<String> {
ResponseFormatter::format(data, content_type)
.map_err(|e| FusekiError::response_formatting(e.to_string()))
}
}
impl Default for EnhancedSparqlService {
fn default() -> Self {
Self::new()
}
}
pub fn validate_sparql_query(query: &str) -> FusekiResult<()> {
if query.trim().is_empty() {
return Err(crate::error::FusekiError::bad_request(
"Empty query".to_string(),
));
}
Ok(())
}
pub fn contains_aggregation_functions(query: &str) -> bool {
let upper = query.to_uppercase();
[
"COUNT",
"SUM",
"AVG",
"MIN",
"MAX",
"GROUP_CONCAT",
"SAMPLE",
]
.iter()
.any(|func| upper.contains(&format!("{func}(")))
}
pub fn contains_sparql_star_features(query: &str) -> bool {
let upper = query.to_uppercase();
let has_quoted_triples = upper.contains("<<") && upper.contains(">>");
let has_star_functions = upper.contains("SUBJECT(")
|| upper.contains("PREDICATE(")
|| upper.contains("OBJECT(")
|| upper.contains("ISTRIPLE(");
let has_annotations = query.contains("{|") && query.contains("|}");
has_quoted_triples || has_star_functions || has_annotations
}
pub fn contains_property_paths(query: &str) -> bool {
let chars: Vec<char> = query.chars().collect();
for i in 0..chars.len() {
match chars[i] {
'/'
if i > 0
&& chars[i - 1].is_alphanumeric()
&& i + 1 < chars.len()
&& chars[i + 1].is_alphanumeric()
=> {
return true;
}
'|'
if i > 0
&& chars[i - 1].is_alphanumeric()
&& i + 1 < chars.len()
&& chars[i + 1].is_alphanumeric()
=> {
return true;
}
'+'
if i > 0 && (chars[i - 1].is_alphanumeric() || chars[i - 1] == ':') => {
return true;
}
'*'
if i > 0 && (chars[i - 1].is_alphanumeric() || chars[i - 1] == ':') => {
return true;
}
_ => {}
}
}
false
}
pub fn contains_subqueries(query: &str) -> bool {
let upper = query.to_uppercase();
upper.matches("SELECT").count() > 1 || upper.contains("ASK") || upper.contains("CONSTRUCT")
}
pub fn contains_bind_values(query: &str) -> bool {
let upper = query.to_uppercase();
upper.contains("BIND(") || upper.contains("VALUES")
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_enhanced_sparql_service() {
let mut service = EnhancedSparqlService::new();
let context = QueryContext::default();
let headers = HeaderMap::new();
let query = "SELECT ?s WHERE { ?s ?p ?o }";
let result = service.process_query(query, context, &headers).await;
assert!(result.is_ok());
}
#[test]
fn test_query_validation() {
assert!(validate_sparql_query("SELECT ?s WHERE { ?s ?p ?o }").is_ok());
assert!(validate_sparql_query("").is_err());
}
#[test]
fn test_feature_detection() {
assert!(contains_aggregation_functions(
"SELECT (COUNT(?s) as ?count) WHERE { ?s ?p ?o }"
));
assert!(!contains_aggregation_functions(
"SELECT ?s WHERE { ?s ?p ?o }"
));
assert!(contains_property_paths(
"SELECT ?s WHERE { ?s foaf:knows+ ?friend }"
));
assert!(!contains_property_paths(
"SELECT ?s WHERE { ?s foaf:knows ?friend }"
));
assert!(contains_sparql_star_features(
"SELECT ?s WHERE { <<?s ?p ?o>> ?meta ?value }"
));
assert!(!contains_sparql_star_features(
"SELECT ?s WHERE { ?s ?p ?o }"
));
}
#[test]
fn test_content_negotiation() {
let service = EnhancedSparqlService::new();
let mut headers = HeaderMap::new();
headers.insert("accept", "application/json".parse().unwrap());
let content_type = service.negotiate_content_type(&headers);
assert_eq!(content_type, "application/json");
}
}