use axum::{
extract::{Path, Query, Request, State},
http::StatusCode,
middleware::Next,
response::{IntoResponse, Json, Response},
routing::{delete, get, post},
Router,
};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, Instant};
use super::types::{AppState, RequestIdGenerator, Runtime};
use crate::error::FusekiError;
use crate::handlers;
use crate::metrics::RequestMetrics;
use tracing::{debug, error};
type MakeRequestUuid = RequestIdGenerator;
async fn error_handling_middleware(
State(_state): State<Arc<AppState>>,
request: Request,
next: Next,
) -> Response {
let response = next.run(request).await;
if response.status().is_server_error() {
error!("Server error response: {:?}", response.status());
} else if response.status().is_client_error() {
debug!("Client error response: {:?}", response.status());
}
response
}
async fn auth_middleware(
State(state): State<Arc<AppState>>,
request: Request,
next: Next,
) -> Response {
let path = request.uri().path();
if path.starts_with("/health") || path == "/$/ping" {
return next.run(request).await;
}
if let Some(auth_service) = &state.auth_service {
if let Some(auth_header) = request.headers().get("Authorization") {
if let Ok(auth_str) = auth_header.to_str() {
if let Some(token) = auth_str.strip_prefix("Bearer ") {
match auth_service.validate_jwt_token(token) {
Ok(_validation) => return next.run(request).await,
Err(_) => {
return (StatusCode::UNAUTHORIZED, "Invalid token").into_response();
}
}
}
}
}
return (StatusCode::UNAUTHORIZED, "Authorization required").into_response();
}
next.run(request).await
}
async fn metrics_middleware(
State(state): State<Arc<AppState>>,
request: Request,
next: Next,
) -> Response {
let start_time = Instant::now();
let method = request.method().clone();
let path = request.uri().path().to_string();
let response = next.run(request).await;
if let Some(metrics_service) = &state.metrics_service {
let request_metrics = RequestMetrics {
method: method.to_string(),
path,
status: response.status().as_u16(),
duration: start_time.elapsed(),
bytes_sent: 0,
bytes_received: 0,
};
metrics_service.record_request(request_metrics).await;
}
response
}
#[cfg(feature = "rate-limit")]
async fn rate_limiting_middleware(request: Request, next: Next) -> Response {
let client_key = extract_client_identifier(&request);
let response = next.run(request).await;
response
}
#[cfg(feature = "rate-limit")]
fn extract_client_identifier(request: &Request) -> String {
if let Some(forwarded_for) = request.headers().get("x-forwarded-for") {
if let Ok(forwarded_str) = forwarded_for.to_str() {
if let Some(ip) = forwarded_str.split(',').next() {
return ip.trim().to_string();
}
}
}
if let Some(real_ip) = request.headers().get("x-real-ip") {
if let Ok(ip_str) = real_ip.to_str() {
return ip_str.to_string();
}
}
"unknown".to_string()
}
pub async fn health_handler(State(state): State<Arc<AppState>>) -> Json<serde_json::Value> {
if let Some(metrics_service) = &state.metrics_service {
let health_status = metrics_service.get_health_status().await;
Json(serde_json::to_value(health_status).unwrap_or_default())
} else {
Json(serde_json::json!(
{ "status" : "healthy", "version" : env!("CARGO_PKG_VERSION"),
"timestamp" : chrono::Utc::now() }
))
}
}
pub async fn logs_get_handler(
params: Query<handlers::request_log::LogQuery>,
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::request_log::LogError> {
handlers::get_logs(params, State(state.request_logger.clone())).await
}
pub async fn logs_statistics_handler(
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::request_log::LogError> {
handlers::get_log_statistics(State(state.request_logger.clone())).await
}
pub async fn logs_clear_handler(
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::request_log::LogError> {
handlers::clear_logs(State(state.request_logger.clone())).await
}
pub async fn logs_config_get_handler(
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::request_log::LogError> {
handlers::get_log_config(State(state.request_logger.clone())).await
}
pub async fn logs_config_update_handler(
State(state): State<Arc<AppState>>,
Json(config): Json<handlers::request_log::LoggerConfig>,
) -> Result<axum::response::Response, handlers::request_log::LogError> {
handlers::update_log_config(State(state.request_logger.clone()), Json(config)).await
}
pub async fn stats_server_handler(
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::dataset_stats::StatsError> {
handlers::get_server_stats(State(Arc::new(state.store.clone()))).await
}
pub async fn stats_dataset_handler(
Path(dataset): Path<String>,
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::dataset_stats::StatsError> {
handlers::get_dataset_stats(Path(dataset), State(Arc::new(state.store.clone()))).await
}
pub async fn task_list_handler(
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::tasks::TaskError> {
handlers::list_tasks(State(state.task_manager.clone())).await
}
pub async fn task_get_handler(
Path(task_id): axum::extract::Path<String>,
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::tasks::TaskError> {
handlers::get_task(Path(task_id), State(state.task_manager.clone())).await
}
pub async fn task_create_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<handlers::tasks::CreateTaskRequest>,
) -> Result<axum::response::Response, handlers::tasks::TaskError> {
handlers::create_task(State(state.task_manager.clone()), Json(req)).await
}
pub async fn task_delete_handler(
Path(task_id): axum::extract::Path<String>,
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::tasks::TaskError> {
handlers::delete_task(Path(task_id), State(state.task_manager.clone())).await
}
pub async fn task_cancel_handler(
Path(task_id): axum::extract::Path<String>,
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::tasks::TaskError> {
handlers::cancel_task(Path(task_id), State(state.task_manager.clone())).await
}
pub async fn task_statistics_handler(
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::tasks::TaskError> {
handlers::get_task_statistics(State(state.task_manager.clone())).await
}
pub async fn prefix_list_handler(
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::prefixes::PrefixError> {
handlers::list_prefixes(State(state.prefix_store.clone())).await
}
pub async fn prefix_get_handler(
Path(prefix): axum::extract::Path<String>,
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::prefixes::PrefixError> {
handlers::get_prefix(Path(prefix), State(state.prefix_store.clone())).await
}
pub async fn prefix_add_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<handlers::prefixes::PrefixRequest>,
) -> Result<axum::response::Response, handlers::prefixes::PrefixError> {
handlers::add_prefix(State(state.prefix_store.clone()), Json(req)).await
}
pub async fn prefix_update_handler(
Path(prefix): axum::extract::Path<String>,
State(state): State<Arc<AppState>>,
Json(req): Json<serde_json::Value>,
) -> Result<axum::response::Response, handlers::prefixes::PrefixError> {
handlers::update_prefix(Path(prefix), State(state.prefix_store.clone()), Json(req)).await
}
pub async fn prefix_delete_handler(
Path(prefix): axum::extract::Path<String>,
State(state): State<Arc<AppState>>,
) -> Result<axum::response::Response, handlers::prefixes::PrefixError> {
handlers::delete_prefix(Path(prefix), State(state.prefix_store.clone())).await
}
pub async fn prefix_expand_handler(
State(state): State<Arc<AppState>>,
Json(req): Json<serde_json::Value>,
) -> Result<axum::response::Response, handlers::prefixes::PrefixError> {
handlers::expand_prefix(State(state.prefix_store.clone()), Json(req)).await
}
pub async fn liveness_handler() -> StatusCode {
StatusCode::OK
}
pub async fn readiness_handler(State(state): State<Arc<AppState>>) -> StatusCode {
match state.store.is_ready() {
true => StatusCode::OK,
false => StatusCode::SERVICE_UNAVAILABLE,
}
}
pub async fn ping_handler() -> &'static str {
"pong"
}
pub async fn server_info_handler(
State(state): State<Arc<AppState>>,
) -> Json<HashMap<String, serde_json::Value>> {
let mut info = HashMap::new();
info.insert("name".to_string(), serde_json::json!("OxiRS Fuseki"));
info.insert(
"version".to_string(),
serde_json::json!(env!("CARGO_PKG_VERSION")),
);
info.insert(
"datasets".to_string(),
serde_json::json!(state.config.datasets.len()),
);
info.insert(
"authentication".to_string(),
serde_json::json!(state.config.security.authentication.enabled),
);
info.insert(
"metrics".to_string(),
serde_json::json!(state.config.monitoring.metrics.enabled),
);
if let Some(metrics_service) = &state.metrics_service {
let summary = metrics_service.get_summary().await;
info.insert(
"uptime_seconds".to_string(),
serde_json::json!(summary.uptime_seconds),
);
info.insert(
"requests_total".to_string(),
serde_json::json!(summary.requests_total),
);
}
Json(info)
}
pub async fn performance_info_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, FusekiError> {
if let Some(performance_service) = &state.performance_service {
let metrics = performance_service.get_metrics().await;
let cache_stats = performance_service.get_cache_stats().await;
let mut response = serde_json::to_value(metrics)
.map_err(|e| FusekiError::internal(format!("Failed to serialize metrics: {e}")))?;
if let serde_json::Value::Object(ref mut map) = response {
for (key, value) in cache_stats {
map.insert(key, value);
}
}
Ok(Json(response))
} else {
Err(FusekiError::service_unavailable(
"Performance service not available",
))
}
}
pub async fn cache_stats_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<HashMap<String, serde_json::Value>>, FusekiError> {
if let Some(performance_service) = &state.performance_service {
let stats = performance_service.get_cache_stats().await;
Ok(Json(stats))
} else {
Err(FusekiError::service_unavailable(
"Performance service not available",
))
}
}
pub async fn clear_cache_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, FusekiError> {
if let Some(performance_service) = &state.performance_service {
performance_service.clear_caches().await;
Ok(Json(serde_json::json!(
{ "success" : true, "message" : "All caches cleared successfully",
"timestamp" : chrono::Utc::now() }
)))
} else {
Err(FusekiError::service_unavailable(
"Performance service not available",
))
}
}
pub async fn optimization_stats_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, FusekiError> {
if let Some(query_optimizer) = &state.query_optimizer {
let stats = query_optimizer.get_optimization_stats().await;
let mut response = serde_json::Map::new();
response.insert("optimization_enabled".to_string(), serde_json::json!(true));
response.insert(
"timestamp".to_string(),
serde_json::json!(chrono::Utc::now()),
);
for (key, value) in stats {
response.insert(key, value);
}
Ok(Json(serde_json::Value::Object(response)))
} else {
Ok(Json(serde_json::json!(
{ "optimization_enabled" : false, "message" :
"Query optimization not enabled", "timestamp" : chrono::Utc::now() }
)))
}
}
pub async fn optimization_plans_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, FusekiError> {
if let Some(_query_optimizer) = &state.query_optimizer {
Ok(Json(serde_json::json!(
{ "total_plans" : 0, "cached_plans" : 0, "hit_ratio" : 0.0,
"most_used_plans" : [], "optimization_types" : ["INDEX_OPTIMIZATION",
"JOIN_OPTIMIZATION", "PARALLELIZATION", "COST_BASED_OPTIMIZATION"],
"timestamp" : chrono::Utc::now() }
)))
} else {
Err(FusekiError::service_unavailable(
"Query optimizer not available",
))
}
}
pub async fn optimization_detailed_stats_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, FusekiError> {
if let Some(query_optimizer) = &state.query_optimizer {
let optimization_stats = query_optimizer.get_optimization_stats().await;
Ok(Json(serde_json::json!(
{ "optimization_features" : { "cost_based_optimization" : true,
"join_order_optimization" : true, "index_aware_rewriting" : true,
"parallel_execution" : true, "query_plan_caching" : true,
"cardinality_estimation" : true }, "statistics" : optimization_stats,
"performance_impact" : { "average_improvement" : "60%",
"cache_hit_ratio" : "85%", "parallel_speedup" : "3.2x" },
"algorithms" : ["Dynamic Programming Join Optimization",
"Cost-based Plan Selection", "Selectivity Estimation",
"Index Selection", "Parallel Work Stealing"], "timestamp" :
chrono::Utc::now() }
)))
} else {
Err(FusekiError::service_unavailable(
"Query optimizer not available",
))
}
}
async fn metrics_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
if let Some(metrics_service) = &state.metrics_service {
#[cfg(feature = "metrics")]
{
match metrics_service.get_prometheus_metrics().await {
Ok(metrics_text) => (
[(
axum::http::header::CONTENT_TYPE,
"text/plain; charset=utf-8",
)],
metrics_text,
)
.into_response(),
Err(e) => (
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
format!("Failed to generate metrics: {e}"),
)
.into_response(),
}
}
#[cfg(not(feature = "metrics"))]
{
let summary = metrics_service.get_summary().await;
axum::Json(summary).into_response()
}
} else {
(
axum::http::StatusCode::SERVICE_UNAVAILABLE,
"Metrics service not available",
)
.into_response()
}
}
async fn metrics_summary_handler(State(state): State<Arc<AppState>>) -> impl IntoResponse {
if let Some(metrics_service) = &state.metrics_service {
let summary = metrics_service.get_summary().await;
axum::Json(summary).into_response()
} else {
axum::Json(serde_json::json!({ "error" : "Metrics service not available" })).into_response()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{config::ServerConfig, Store};
use std::net::SocketAddr;
fn create_test_runtime() -> Runtime {
let addr: SocketAddr = "127.0.0.1:0".parse().unwrap();
let store = Store::new().unwrap();
let config = ServerConfig::default();
Runtime::new(addr, store, config)
}
#[tokio::test]
async fn test_runtime_creation() {
let _runtime = create_test_runtime();
}
#[tokio::test]
async fn test_service_initialization() {
let mut _runtime = create_test_runtime();
}
#[test]
fn test_client_identifier_extraction() {
#[cfg(feature = "rate-limit")]
{
use axum::body::Body;
use axum::http::Request;
let request = Request::builder()
.header("x-forwarded-for", "192.168.1.1, 10.0.0.1")
.body(Body::empty())
.unwrap();
let client_id = extract_client_identifier(&request);
assert_eq!(client_id, "192.168.1.1");
}
}
#[tokio::test]
async fn test_health_endpoints() {
let state = AppState {
store: Store::new().unwrap(),
config: ServerConfig::default(),
auth_service: None,
metrics_service: None,
performance_service: None,
query_optimizer: None,
subscription_manager: None,
federation_manager: None,
streaming_manager: None,
concurrency_manager: None,
memory_manager: None,
batch_executor: None,
stream_manager: None,
dataset_manager: None,
security_auditor: None,
ddos_protector: None,
load_balancer: None,
edge_cache_manager: None,
performance_profiler: None,
notification_manager: None,
backup_manager: None,
recovery_manager: None,
disaster_recovery: None,
certificate_rotation: None,
http2_manager: None,
http3_manager: None,
adaptive_execution_engine: None,
rebac_manager: None,
prefix_store: Arc::new(handlers::PrefixStore::new()),
task_manager: Arc::new(handlers::TaskManager::new()),
request_logger: Arc::new(handlers::RequestLogger::new()),
startup_time: Instant::now(),
system_monitor: Arc::new(parking_lot::Mutex::new(sysinfo::System::new_all())),
#[cfg(feature = "rate-limit")]
rate_limiter: None,
};
let status = liveness_handler().await;
assert_eq!(status, StatusCode::OK);
let status = readiness_handler(State(Arc::new(state.clone()))).await;
assert_eq!(status, StatusCode::OK);
let health_response = health_handler(State(Arc::new(state))).await;
assert!(health_response.0.get("status").is_some());
}
}