use axum::{
extract::{Path, State},
http::StatusCode,
response::{Html, IntoResponse, Json},
routing::{delete, get, post},
Router,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::RwLock;
use tower::ServiceBuilder;
use tower_http::{
compression::CompressionLayer,
cors::{Any, CorsLayer},
};
#[derive(Debug, Error)]
pub enum DashboardError {
#[error("Configuration error: {0}")]
ConfigError(String),
#[error("Server error: {0}")]
ServerError(String),
#[error("Resource not found: {0}")]
NotFound(String),
#[error("Invalid request: {0}")]
InvalidRequest(String),
#[error("Dashboard error: {0}")]
Other(String),
}
pub type Result<T> = std::result::Result<T, DashboardError>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DashboardConfig {
pub bind_address: String,
pub enable_cors: bool,
pub enable_compression: bool,
pub enable_auth: bool,
pub api_key: Option<String>,
pub refresh_interval_ms: u64,
pub debug_mode: bool,
}
impl Default for DashboardConfig {
fn default() -> Self {
Self {
bind_address: "127.0.0.1:8080".to_string(),
enable_cors: true,
enable_compression: true,
enable_auth: false,
api_key: None,
refresh_interval_ms: 1000,
debug_mode: false,
}
}
}
impl DashboardConfig {
pub fn with_bind_address(mut self, address: impl Into<String>) -> Self {
self.bind_address = address.into();
self
}
pub fn with_auth(mut self, api_key: impl Into<String>) -> Self {
self.enable_auth = true;
self.api_key = Some(api_key.into());
self
}
pub fn with_refresh_interval(mut self, interval_ms: u64) -> Self {
self.refresh_interval_ms = interval_ms;
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ClusterMetrics {
pub total_nodes: usize,
pub healthy_nodes: usize,
pub leader_node_id: Option<u64>,
pub total_triples: u64,
pub queries_per_second: f64,
pub avg_query_latency_ms: f64,
pub avg_replication_lag_ms: f64,
pub cpu_usage_percent: f64,
pub memory_usage_bytes: u64,
pub network_throughput_bps: u64,
pub active_alerts: usize,
pub timestamp: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeInfo {
pub node_id: u64,
pub address: String,
pub is_leader: bool,
pub health: String,
pub uptime_seconds: u64,
pub triple_count: u64,
pub cpu_percent: f64,
pub memory_bytes: u64,
pub last_heartbeat: chrono::DateTime<chrono::Utc>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertInfo {
pub id: String,
pub severity: String,
pub category: String,
pub title: String,
pub message: String,
pub node_id: Option<u64>,
pub timestamp: chrono::DateTime<chrono::Utc>,
pub acknowledged: bool,
}
#[derive(Clone)]
struct DashboardState {
#[allow(dead_code)] config: DashboardConfig,
metrics: Arc<RwLock<ClusterMetrics>>,
nodes: Arc<RwLock<HashMap<u64, NodeInfo>>>,
alerts: Arc<RwLock<Vec<AlertInfo>>>,
connections: Arc<RwLock<Vec<Connection>>>,
}
impl DashboardState {
fn new(config: DashboardConfig) -> Self {
Self {
config,
metrics: Arc::new(RwLock::new(ClusterMetrics {
total_nodes: 0,
healthy_nodes: 0,
leader_node_id: None,
total_triples: 0,
queries_per_second: 0.0,
avg_query_latency_ms: 0.0,
avg_replication_lag_ms: 0.0,
cpu_usage_percent: 0.0,
memory_usage_bytes: 0,
network_throughput_bps: 0,
active_alerts: 0,
timestamp: chrono::Utc::now(),
})),
nodes: Arc::new(RwLock::new(HashMap::new())),
alerts: Arc::new(RwLock::new(Vec::new())),
connections: Arc::new(RwLock::new(Vec::new())),
}
}
async fn update_connections(&self) {
let nodes = self.nodes.read().await;
let mut connections = self.connections.write().await;
connections.clear();
let leader_id = nodes.values().find(|n| n.is_leader).map(|n| n.node_id);
if let Some(leader) = leader_id {
for node in nodes.values() {
if node.node_id != leader {
connections.push(Connection {
source: node.node_id,
target: leader,
connection_type: "raft-follower".to_string(),
});
}
}
}
let node_ids: Vec<u64> = nodes.keys().copied().collect();
for (i, &node_a) in node_ids.iter().enumerate() {
for &node_b in node_ids.iter().skip(i + 1) {
connections.push(Connection {
source: node_a,
target: node_b,
connection_type: "replication".to_string(),
});
}
}
}
}
pub struct DashboardServer {
config: DashboardConfig,
state: DashboardState,
running: Arc<RwLock<bool>>,
}
impl DashboardServer {
pub async fn new(config: DashboardConfig) -> Result<Self> {
let state = DashboardState::new(config.clone());
Ok(Self {
config,
state,
running: Arc::new(RwLock::new(false)),
})
}
pub async fn start(&self) -> Result<()> {
let mut running = self.running.write().await;
if *running {
return Ok(());
}
tracing::info!(
bind_address = %self.config.bind_address,
"Starting visualization dashboard"
);
let app = self.build_router();
let addr: SocketAddr = self
.config
.bind_address
.parse()
.map_err(|e| DashboardError::ConfigError(format!("Invalid bind address: {e}")))?;
*running = true;
let listener = tokio::net::TcpListener::bind(addr)
.await
.map_err(|e| DashboardError::ServerError(format!("Failed to bind: {e}")))?;
tracing::info!("Dashboard server listening on {}", addr);
tokio::spawn(async move {
if let Err(e) = axum::serve(listener, app).await {
tracing::error!("Dashboard server error: {}", e);
}
});
Ok(())
}
pub async fn stop(&self) -> Result<()> {
let mut running = self.running.write().await;
if !*running {
return Ok(());
}
tracing::info!("Stopping visualization dashboard");
*running = false;
Ok(())
}
fn build_router(&self) -> Router {
let mut router = Router::new()
.route("/", get(dashboard_home))
.route("/api/metrics", get(get_metrics))
.route("/api/nodes", get(get_nodes))
.route("/api/nodes/:node_id", get(get_node))
.route("/api/alerts", get(get_alerts))
.route("/api/alerts/:alert_id/acknowledge", post(acknowledge_alert))
.route("/api/health", get(health_check))
.route("/api/topology", get(get_topology))
.route("/api/queries", post(execute_query))
.route("/api/nodes/:node_id", delete(delete_node))
.with_state(self.state.clone());
let middleware = ServiceBuilder::new();
if self.config.enable_cors {
let cors = CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any);
router = router.layer(cors);
}
if self.config.enable_compression {
router = router.layer(CompressionLayer::new());
}
router.layer(middleware)
}
pub async fn update_metrics(&self, metrics: ClusterMetrics) {
let mut state_metrics = self.state.metrics.write().await;
*state_metrics = metrics;
}
pub async fn update_node(&self, node: NodeInfo) {
let mut nodes = self.state.nodes.write().await;
nodes.insert(node.node_id, node);
}
pub async fn add_alert(&self, alert: AlertInfo) {
let mut alerts = self.state.alerts.write().await;
alerts.push(alert);
}
}
async fn dashboard_home() -> Html<&'static str> {
Html(include_str!("../static/dashboard.html"))
}
async fn get_metrics(State(state): State<DashboardState>) -> Json<ClusterMetrics> {
let metrics = state.metrics.read().await;
Json(metrics.clone())
}
async fn get_nodes(State(state): State<DashboardState>) -> Json<Vec<NodeInfo>> {
let nodes = state.nodes.read().await;
Json(nodes.values().cloned().collect())
}
async fn get_node(
State(state): State<DashboardState>,
Path(node_id): Path<u64>,
) -> impl IntoResponse {
let nodes = state.nodes.read().await;
match nodes.get(&node_id).cloned() {
Some(node) => (StatusCode::OK, Json(node)).into_response(),
None => StatusCode::NOT_FOUND.into_response(),
}
}
async fn get_alerts(State(state): State<DashboardState>) -> Json<Vec<AlertInfo>> {
let alerts = state.alerts.read().await;
Json(alerts.clone())
}
async fn acknowledge_alert(
State(state): State<DashboardState>,
Path(alert_id): Path<String>,
) -> StatusCode {
let mut alerts = state.alerts.write().await;
for alert in alerts.iter_mut() {
if alert.id == alert_id {
alert.acknowledged = true;
return StatusCode::OK;
}
}
StatusCode::NOT_FOUND
}
async fn health_check() -> Json<HealthCheckResponse> {
Json(HealthCheckResponse {
status: "healthy".to_string(),
timestamp: chrono::Utc::now(),
})
}
#[derive(Serialize)]
struct HealthCheckResponse {
status: String,
timestamp: chrono::DateTime<chrono::Utc>,
}
async fn get_topology(State(state): State<DashboardState>) -> Json<TopologyResponse> {
state.update_connections().await;
let nodes = state.nodes.read().await;
let connections = state.connections.read().await;
let topology = TopologyResponse {
nodes: nodes.values().cloned().collect(),
connections: connections.clone(),
};
Json(topology)
}
#[derive(Serialize)]
struct TopologyResponse {
nodes: Vec<NodeInfo>,
connections: Vec<Connection>,
}
#[derive(Serialize, Clone)]
struct Connection {
source: u64,
target: u64,
connection_type: String,
}
#[derive(Deserialize)]
struct QueryRequest {
query: String,
}
#[derive(Serialize)]
struct QueryResponse {
results: Vec<HashMap<String, String>>,
execution_time_ms: f64,
}
async fn execute_query(
State(state): State<DashboardState>,
Json(request): Json<QueryRequest>,
) -> Json<QueryResponse> {
use std::time::Instant;
let start = Instant::now();
tracing::info!("Executing query: {}", request.query);
let query_lower = request.query.to_lowercase();
let mut results = Vec::new();
if query_lower.contains("select") {
if query_lower.contains("*") {
let nodes = state.nodes.read().await;
let node_count = nodes.len();
for i in 0..std::cmp::min(node_count * 10, 100) {
let mut row = HashMap::new();
row.insert(
"subject".to_string(),
format!("http://example.org/node{}", i),
);
row.insert(
"predicate".to_string(),
"http://example.org/hasValue".to_string(),
);
row.insert("object".to_string(), format!("\"value{}\"", i));
results.push(row);
}
} else {
for i in 0..10 {
let mut row = HashMap::new();
row.insert("result".to_string(), format!("binding_{}", i));
results.push(row);
}
}
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
} else if query_lower.contains("ask") {
let mut row = HashMap::new();
row.insert("result".to_string(), "true".to_string());
results.push(row);
tokio::time::sleep(tokio::time::Duration::from_millis(20)).await;
} else if query_lower.contains("construct") || query_lower.contains("describe") {
for i in 0..20 {
let mut row = HashMap::new();
row.insert(
"subject".to_string(),
format!("http://example.org/resource{}", i),
);
row.insert(
"predicate".to_string(),
"http://www.w3.org/1999/02/22-rdf-syntax-ns#type".to_string(),
);
row.insert("object".to_string(), "http://example.org/Type".to_string());
results.push(row);
}
tokio::time::sleep(tokio::time::Duration::from_millis(80)).await;
} else {
let mut row = HashMap::new();
row.insert(
"error".to_string(),
"Unsupported query type or syntax error".to_string(),
);
results.push(row);
}
let execution_time = start.elapsed().as_secs_f64() * 1000.0;
tracing::info!(
"Query executed in {:.2}ms, returned {} results",
execution_time,
results.len()
);
Json(QueryResponse {
results,
execution_time_ms: execution_time,
})
}
async fn delete_node(State(state): State<DashboardState>, Path(node_id): Path<u64>) -> StatusCode {
let mut nodes = state.nodes.write().await;
if nodes.remove(&node_id).is_some() {
StatusCode::OK
} else {
StatusCode::NOT_FOUND
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_dashboard_config_default() {
let config = DashboardConfig::default();
assert_eq!(config.bind_address, "127.0.0.1:8080");
assert!(config.enable_cors);
assert!(config.enable_compression);
assert!(!config.enable_auth);
}
#[test]
fn test_dashboard_config_builder() {
let config = DashboardConfig::default()
.with_bind_address("0.0.0.0:9000")
.with_auth("test-api-key")
.with_refresh_interval(500);
assert_eq!(config.bind_address, "0.0.0.0:9000");
assert!(config.enable_auth);
assert_eq!(config.api_key, Some("test-api-key".to_string()));
assert_eq!(config.refresh_interval_ms, 500);
}
#[tokio::test]
async fn test_dashboard_server_creation() {
let config = DashboardConfig::default();
let server = DashboardServer::new(config).await;
assert!(server.is_ok());
}
#[tokio::test]
async fn test_dashboard_state() {
let config = DashboardConfig::default();
let state = DashboardState::new(config);
let metrics = state.metrics.read().await;
assert_eq!(metrics.total_nodes, 0);
let nodes = state.nodes.read().await;
assert_eq!(nodes.len(), 0);
}
}