use crate::server::AppState;
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Json},
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use tracing::instrument;
#[derive(Debug, Serialize)]
pub struct PerformanceStats {
pub concurrency: Option<ConcurrencyStats>,
pub memory: Option<MemoryStats>,
pub batching: Option<BatchingStats>,
pub streaming: Option<StreamingStats>,
pub datasets: Option<DatasetStats>,
pub system: SystemStats,
}
#[derive(Debug, Serialize)]
pub struct ConcurrencyStats {
pub active_requests: usize,
pub queued_requests: usize,
pub total_requests: u64,
pub completed_requests: u64,
pub failed_requests: u64,
pub rejected_requests: u64,
pub average_wait_time_ms: f64,
pub average_execution_time_ms: f64,
pub current_load: f64,
}
#[derive(Debug, Serialize)]
pub struct MemoryStats {
pub total_allocated: u64,
pub total_deallocated: u64,
pub current_usage: u64,
pub peak_usage: u64,
pub active_objects: usize,
pub pooled_objects: usize,
pub pool_hit_ratio: f64,
pub memory_pressure: f64,
pub gc_runs: u64,
pub last_gc_duration_ms: u64,
}
#[derive(Debug, Serialize)]
pub struct BatchingStats {
pub total_batches: u64,
pub total_queries: u64,
pub average_batch_size: f64,
pub average_wait_time_ms: f64,
pub average_execution_time_ms: f64,
pub parallel_efficiency: f64,
pub queries_per_second: f64,
}
#[derive(Debug, Serialize)]
pub struct StreamingStats {
pub total_bytes: u64,
pub total_chunks: u64,
pub total_rows: u64,
pub compression_ratio: f64,
pub average_chunk_size: f64,
pub throughput_mbps: f64,
pub active_streams: usize,
pub backpressure_events: u64,
}
#[derive(Debug, Serialize)]
pub struct DatasetStats {
pub total_datasets: usize,
pub total_snapshots: usize,
pub active_operations: usize,
pub pending_backups: usize,
}
#[derive(Debug, Serialize)]
pub struct SystemStats {
pub uptime_seconds: u64,
pub cpu_usage_percent: f64,
pub memory_usage_mb: f64,
pub worker_threads: usize,
}
#[instrument(skip(state))]
pub async fn get_performance_stats(
State(state): State<Arc<AppState>>,
) -> Result<Json<PerformanceStats>, StatusCode> {
let concurrency_stats = if let Some(ref manager) = state.concurrency_manager {
let stats = manager.get_stats().await;
Some(ConcurrencyStats {
active_requests: stats.active_requests,
queued_requests: stats.queued_requests,
total_requests: stats.total_requests,
completed_requests: stats.completed_requests,
failed_requests: stats.failed_requests,
rejected_requests: stats.rejected_requests,
average_wait_time_ms: stats.average_wait_time_ms,
average_execution_time_ms: stats.average_execution_time_ms,
current_load: stats.current_load,
})
} else {
None
};
let memory_stats = if let Some(ref manager) = state.memory_manager {
let stats = manager.get_stats().await;
Some(MemoryStats {
total_allocated: stats.total_allocated,
total_deallocated: stats.total_deallocated,
current_usage: stats.current_usage,
peak_usage: stats.peak_usage,
active_objects: stats.active_objects,
pooled_objects: stats.pooled_objects,
pool_hit_ratio: stats.pool_hit_ratio,
memory_pressure: stats.memory_pressure,
gc_runs: stats.gc_runs,
last_gc_duration_ms: stats.last_gc_duration_ms,
})
} else {
None
};
let batching_stats = if let Some(ref executor) = state.batch_executor {
let stats = executor.get_stats().await;
Some(BatchingStats {
total_batches: stats.total_batches,
total_queries: stats.total_queries,
average_batch_size: stats.average_batch_size,
average_wait_time_ms: stats.average_wait_time_ms,
average_execution_time_ms: stats.average_execution_time_ms,
parallel_efficiency: stats.parallel_efficiency,
queries_per_second: stats.queries_per_second,
})
} else {
None
};
let streaming_stats = if let Some(ref manager) = state.stream_manager {
let stats = manager.get_stats().await;
Some(StreamingStats {
total_bytes: stats.total_bytes,
total_chunks: stats.total_chunks,
total_rows: stats.total_rows,
compression_ratio: stats.compression_ratio,
average_chunk_size: stats.average_chunk_size,
throughput_mbps: stats.throughput_mbps,
active_streams: stats.active_streams,
backpressure_events: stats.backpressure_events,
})
} else {
None
};
let dataset_stats = if let Some(ref manager) = state.dataset_manager {
let stats = manager.get_stats().await;
Some(DatasetStats {
total_datasets: stats.total_datasets,
total_snapshots: stats.total_snapshots,
active_operations: stats.active_operations,
pending_backups: stats.pending_backups,
})
} else {
None
};
let mut system = state.system_monitor.lock();
system.refresh_all();
let cpu_usage_percent = system.global_cpu_usage() as f64;
let used_memory = system.used_memory() as f64 / (1024.0 * 1024.0);
let system_stats = SystemStats {
uptime_seconds: state.startup_time.elapsed().as_secs(),
cpu_usage_percent,
memory_usage_mb: used_memory,
worker_threads: num_cpus::get(),
};
Ok(Json(PerformanceStats {
concurrency: concurrency_stats,
memory: memory_stats,
batching: batching_stats,
streaming: streaming_stats,
datasets: dataset_stats,
system: system_stats,
}))
}
#[instrument(skip(state))]
pub async fn get_memory_stats(
State(state): State<Arc<AppState>>,
) -> Result<Json<MemoryStats>, StatusCode> {
if let Some(ref manager) = state.memory_manager {
let stats = manager.get_stats().await;
Ok(Json(MemoryStats {
total_allocated: stats.total_allocated,
total_deallocated: stats.total_deallocated,
current_usage: stats.current_usage,
peak_usage: stats.peak_usage,
active_objects: stats.active_objects,
pooled_objects: stats.pooled_objects,
pool_hit_ratio: stats.pool_hit_ratio,
memory_pressure: stats.memory_pressure,
gc_runs: stats.gc_runs,
last_gc_duration_ms: stats.last_gc_duration_ms,
}))
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
#[instrument(skip(state))]
pub async fn get_concurrency_stats(
State(state): State<Arc<AppState>>,
) -> Result<Json<ConcurrencyStats>, StatusCode> {
if let Some(ref manager) = state.concurrency_manager {
let stats = manager.get_stats().await;
Ok(Json(ConcurrencyStats {
active_requests: stats.active_requests,
queued_requests: stats.queued_requests,
total_requests: stats.total_requests,
completed_requests: stats.completed_requests,
failed_requests: stats.failed_requests,
rejected_requests: stats.rejected_requests,
average_wait_time_ms: stats.average_wait_time_ms,
average_execution_time_ms: stats.average_execution_time_ms,
current_load: stats.current_load,
}))
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
#[instrument(skip(state))]
pub async fn trigger_gc(State(state): State<Arc<AppState>>) -> Result<StatusCode, StatusCode> {
if let Some(ref manager) = state.memory_manager {
manager
.force_gc()
.await
.map(|_| StatusCode::OK)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
#[derive(Debug, Serialize)]
pub struct Beta2HealthCheck {
pub concurrency_manager: bool,
pub memory_manager: bool,
pub batch_executor: bool,
pub stream_manager: bool,
pub dataset_manager: bool,
pub all_healthy: bool,
}
#[instrument(skip(state))]
pub async fn beta2_health_check(State(state): State<Arc<AppState>>) -> Json<Beta2HealthCheck> {
let concurrency_ok = state.concurrency_manager.is_some();
let memory_ok = state.memory_manager.is_some();
let batch_ok = state.batch_executor.is_some();
let stream_ok = state.stream_manager.is_some();
let dataset_ok = state.dataset_manager.is_some();
let all_healthy = concurrency_ok && memory_ok && batch_ok && stream_ok && dataset_ok;
Json(Beta2HealthCheck {
concurrency_manager: concurrency_ok,
memory_manager: memory_ok,
batch_executor: batch_ok,
stream_manager: stream_ok,
dataset_manager: dataset_ok,
all_healthy,
})
}
#[derive(Debug, Serialize)]
pub struct ProfilerReport {
pub enabled: bool,
pub total_profiles: usize,
pub query_profiles: Vec<QueryProfile>,
pub operation_stats: Vec<OperationStat>,
pub recommendations: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct QueryProfile {
pub query_id: String,
pub execution_time_ms: f64,
pub phases: Vec<String>,
pub bottlenecks: Vec<String>,
}
#[derive(Debug, Serialize)]
pub struct OperationStat {
pub operation: String,
pub count: u64,
pub avg_duration_ms: f64,
pub min_duration_ms: f64,
pub max_duration_ms: f64,
pub p50_ms: f64,
pub p95_ms: f64,
pub p99_ms: f64,
}
#[instrument(skip(state))]
pub async fn profiler_report_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<ProfilerReport>, StatusCode> {
if let Some(ref profiler) = state.performance_profiler {
match profiler.generate_report(3600).await {
Ok(report) => {
Ok(Json(ProfilerReport {
enabled: true,
total_profiles: report.slow_queries.len(),
query_profiles: report
.slow_queries
.into_iter()
.map(|p| {
let bottlenecks = detect_bottlenecks(&p.phases);
QueryProfile {
query_id: p.id.clone(),
execution_time_ms: p.execution_time_ms as f64,
phases: p.phases.iter().map(|ph| ph.name.clone()).collect(),
bottlenecks,
}
})
.collect(),
operation_stats: report
.top_operations
.into_iter()
.map(|op| OperationStat {
operation: op.operation,
count: op.execution_count,
avg_duration_ms: op.avg_time_ms,
min_duration_ms: op.min_time_ms as f64,
max_duration_ms: op.max_time_ms as f64,
p50_ms: op.avg_time_ms, p95_ms: op.max_time_ms as f64 * 0.95,
p99_ms: op.max_time_ms as f64 * 0.99,
})
.collect(),
recommendations: report.recommendations,
}))
}
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
#[derive(Debug, Serialize)]
pub struct QueryStats {
pub total_queries: u64,
pub avg_execution_time_ms: f64,
pub slow_queries: Vec<SlowQuery>,
}
#[derive(Debug, Serialize)]
pub struct SlowQuery {
pub query_id: String,
pub execution_time_ms: f64,
pub timestamp: String,
}
#[instrument(skip(state))]
pub async fn profiler_query_stats_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<QueryStats>, StatusCode> {
if let Some(ref profiler) = state.performance_profiler {
let stats_map = profiler.get_query_statistics().await;
let total_queries = stats_map.get("total_queries").copied().unwrap_or(0);
let avg_execution_time_ms =
stats_map.get("avg_execution_time_ms").copied().unwrap_or(0) as f64;
match profiler.generate_report(3600).await {
Ok(report) => {
let slow_queries = report
.slow_queries
.into_iter()
.map(|q| SlowQuery {
query_id: q.id,
execution_time_ms: q.execution_time_ms as f64,
timestamp: q.timestamp.to_rfc3339(),
})
.collect();
Ok(Json(QueryStats {
total_queries,
avg_execution_time_ms,
slow_queries,
}))
}
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
#[instrument(skip(state))]
pub async fn profiler_reset_handler(
State(state): State<Arc<AppState>>,
) -> Result<StatusCode, StatusCode> {
if state.performance_profiler.is_some() {
Ok(StatusCode::NOT_IMPLEMENTED)
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
fn detect_bottlenecks(phases: &[crate::performance_profiler::ExecutionPhase]) -> Vec<String> {
if phases.is_empty() {
return Vec::new();
}
let mut bottlenecks = Vec::new();
let total_time: u64 = phases.iter().map(|p| p.duration_ms).sum();
if total_time == 0 {
return bottlenecks;
}
let avg_duration = total_time as f64 / phases.len() as f64;
let total_cpu: u64 = phases.iter().map(|p| p.cpu_time_ms).sum();
let total_memory: u64 = phases.iter().map(|p| p.memory_bytes).sum();
for phase in phases {
let duration_pct = (phase.duration_ms as f64 / total_time as f64) * 100.0;
let cpu_pct = if total_cpu > 0 {
(phase.cpu_time_ms as f64 / total_cpu as f64) * 100.0
} else {
0.0
};
let memory_pct = if total_memory > 0 {
(phase.memory_bytes as f64 / total_memory as f64) * 100.0
} else {
0.0
};
if duration_pct > 30.0 {
bottlenecks.push(format!(
"Phase '{}' takes {:.1}% of total execution time ({} ms)",
phase.name, duration_pct, phase.duration_ms
));
}
if phase.duration_ms as f64 > avg_duration * 3.0 {
bottlenecks.push(format!(
"Phase '{}' is {:.1}x slower than average phase duration",
phase.name,
phase.duration_ms as f64 / avg_duration
));
}
if cpu_pct > 40.0 {
bottlenecks.push(format!(
"Phase '{}' is CPU-intensive ({:.1}% of total CPU time)",
phase.name, cpu_pct
));
}
if memory_pct > 50.0 {
let memory_mb = phase.memory_bytes as f64 / (1024.0 * 1024.0);
bottlenecks.push(format!(
"Phase '{}' is memory-intensive ({:.1} MB, {:.1}% of total memory)",
phase.name, memory_mb, memory_pct
));
}
if phase.duration_ms > 0 && phase.cpu_time_ms > 0 {
let cpu_efficiency = (phase.cpu_time_ms as f64 / phase.duration_ms as f64) * 100.0;
if cpu_efficiency < 20.0 && phase.duration_ms > 100 {
bottlenecks.push(format!(
"Phase '{}' has low CPU efficiency ({:.1}%), likely I/O bound or waiting",
phase.name, cpu_efficiency
));
}
}
}
bottlenecks.sort();
bottlenecks.dedup();
bottlenecks.truncate(5);
bottlenecks
}
#[derive(Debug, Serialize)]
pub struct OptimizationStatsResponse {
pub cached_plans: usize,
pub total_triples: u64,
pub indexed_predicates: usize,
pub last_updated: String,
}
#[instrument(skip(state))]
pub async fn optimization_stats_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<HashMap<String, serde_json::Value>>, StatusCode> {
if let Some(ref optimizer) = state.query_optimizer {
let stats = optimizer.get_optimization_stats().await;
Ok(Json(stats))
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
#[instrument(skip(state))]
pub async fn optimization_plans_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<Vec<crate::optimization::OptimizedQueryPlan>>, StatusCode> {
if let Some(ref optimizer) = state.query_optimizer {
let plans = optimizer.get_cached_plans().await;
Ok(Json(plans))
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
#[instrument(skip(state))]
pub async fn clear_optimization_cache_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<serde_json::Value>, StatusCode> {
if let Some(ref optimizer) = state.query_optimizer {
let cleared_count = optimizer.clear_plan_cache().await;
Ok(Json(serde_json::json!({
"success": true,
"cleared_plans": cleared_count,
"message": format!("Cleared {} cached query plans", cleared_count)
})))
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}
#[instrument(skip(state))]
pub async fn database_statistics_handler(
State(state): State<Arc<AppState>>,
) -> Result<Json<crate::optimization::DatabaseStatistics>, StatusCode> {
if let Some(ref optimizer) = state.query_optimizer {
let stats = optimizer.get_database_statistics().await;
Ok(Json(stats))
} else {
Err(StatusCode::SERVICE_UNAVAILABLE)
}
}