#[cfg(feature = "dashboard")]
use axum::{
body::Body,
extract::State,
http::{header, HeaderValue, Request, Response, StatusCode, Uri},
middleware::{self, Next},
response::IntoResponse,
routing::get,
Json, Router,
};
#[cfg(feature = "dashboard")]
use rust_embed::RustEmbed;
#[cfg(feature = "dashboard")]
use serde::Serialize;
#[cfg(feature = "dashboard")]
use std::sync::Arc;
#[cfg(feature = "dashboard")]
use crate::raft_api::RaftApiState;
#[cfg(feature = "dashboard")]
#[derive(RustEmbed)]
#[folder = "dashboard/"]
struct DashboardAssets;
#[cfg(feature = "dashboard")]
#[derive(Debug, Serialize)]
pub struct DashboardData {
pub topics: Vec<TopicInfo>,
pub consumer_groups: Vec<ConsumerGroupInfo>,
pub active_connections: u64,
pub total_requests: u64,
pub uptime_secs: u64,
pub timestamp: u64,
}
#[cfg(feature = "dashboard")]
#[derive(Debug, Serialize)]
pub struct TopicInfo {
pub name: String,
pub partitions: u32,
pub replication_factor: u16,
pub message_count: u64,
pub partition_offsets: Vec<PartitionOffset>,
}
#[cfg(feature = "dashboard")]
#[derive(Debug, Serialize)]
pub struct PartitionOffset {
pub partition: u32,
pub earliest: u64,
pub latest: u64,
pub count: u64,
}
#[cfg(feature = "dashboard")]
#[derive(Debug, Serialize)]
pub struct ConsumerGroupInfo {
pub group_id: String,
pub state: String,
pub member_count: usize,
pub topics: Vec<String>,
pub total_lag: u64,
}
#[cfg(feature = "dashboard")]
#[derive(Clone)]
pub struct DashboardState {
pub raft_state: RaftApiState,
pub stats: Arc<crate::cluster_server::ServerStats>,
pub topic_manager: rivven_core::TopicManager,
pub offset_manager: rivven_core::OffsetManager,
}
#[cfg(feature = "dashboard")]
async fn security_headers_middleware(request: Request<Body>, next: Next) -> Response<Body> {
let mut response = next.run(request).await;
let headers = response.headers_mut();
headers.insert(
header::X_CONTENT_TYPE_OPTIONS,
HeaderValue::from_static("nosniff"),
);
headers.insert(header::X_FRAME_OPTIONS, HeaderValue::from_static("DENY"));
headers.insert(
header::CONTENT_SECURITY_POLICY,
HeaderValue::from_static(
"default-src 'self'; \
script-src 'self' 'unsafe-inline'; \
style-src 'self' 'unsafe-inline'; \
img-src 'self' data:; \
connect-src 'self'; \
font-src 'self'; \
frame-ancestors 'none'",
),
);
headers.insert(
header::REFERRER_POLICY,
HeaderValue::from_static("strict-origin-when-cross-origin"),
);
headers.insert(
"Permissions-Policy",
HeaderValue::from_static("geolocation=(), microphone=(), camera=()"),
);
response
}
#[cfg(feature = "dashboard")]
pub fn create_dashboard_router(state: DashboardState) -> Router {
Router::new()
.route("/dashboard/data", get(dashboard_data_handler))
.fallback(static_handler)
.layer(middleware::from_fn(security_headers_middleware))
.with_state(state)
}
#[cfg(feature = "dashboard")]
async fn static_handler(uri: Uri) -> impl IntoResponse {
let path = uri.path().trim_start_matches('/');
let path = if path.is_empty() || !path.contains('.') {
"index.html"
} else {
path
};
match DashboardAssets::get(path) {
Some(content) => {
let mime = mime_guess::from_path(path)
.first_or_octet_stream()
.to_string();
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, mime)
.header(
header::CACHE_CONTROL,
if path.ends_with(".html") {
"no-cache, no-store, must-revalidate"
} else {
"public, max-age=31536000, immutable"
},
)
.body(Body::from(content.data.into_owned()))
.unwrap()
}
None => Response::builder()
.status(StatusCode::NOT_FOUND)
.header(header::CONTENT_TYPE, "text/plain")
.body(Body::from("Not Found"))
.unwrap(),
}
}
#[cfg(feature = "dashboard")]
async fn dashboard_data_handler(State(state): State<DashboardState>) -> impl IntoResponse {
use std::time::{SystemTime, UNIX_EPOCH};
let topic_list = state.topic_manager.list_topics().await;
let mut topics: Vec<TopicInfo> = Vec::new();
for name in topic_list {
if let Ok(topic) = state.topic_manager.get_topic(&name).await {
let num_partitions = topic.num_partitions() as u32;
let mut partition_offsets = Vec::new();
let mut total_messages: u64 = 0;
for p in topic.all_partitions() {
let earliest = p.earliest_offset().await.unwrap_or(0);
let latest = p.latest_offset().await;
let count = latest.saturating_sub(earliest);
total_messages += count;
partition_offsets.push(PartitionOffset {
partition: p.id(),
earliest,
latest,
count,
});
}
topics.push(TopicInfo {
name,
partitions: num_partitions,
replication_factor: 1, message_count: total_messages,
partition_offsets,
});
} else {
topics.push(TopicInfo {
name,
partitions: 1,
replication_factor: 1,
message_count: 0,
partition_offsets: vec![],
});
}
}
let groups = state.offset_manager.list_groups().await;
let mut consumer_groups: Vec<ConsumerGroupInfo> = Vec::new();
for group_id in groups {
let offsets = state.offset_manager.get_group_offsets(&group_id).await;
let group_topics: Vec<String> = offsets
.as_ref()
.map(|o| o.keys().cloned().collect())
.unwrap_or_default();
let mut total_lag: u64 = 0;
if let Some(ref group_offsets) = offsets {
for (topic_name, partition_offsets) in group_offsets.iter() {
if let Ok(topic) = state.topic_manager.get_topic(topic_name).await {
for (partition_id, committed_offset) in partition_offsets.iter() {
if let Ok(partition) = topic.partition(*partition_id) {
let latest = partition.latest_offset().await;
total_lag += latest.saturating_sub(*committed_offset);
}
}
}
}
}
consumer_groups.push(ConsumerGroupInfo {
group_id,
state: "Stable".to_string(),
member_count: 0, topics: group_topics,
total_lag,
});
}
let active_connections = state.stats.get_active_connections();
let total_requests = state.stats.get_total_requests();
let uptime_secs = state.stats.uptime().as_secs();
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0);
let data = DashboardData {
topics,
consumer_groups,
active_connections,
total_requests,
uptime_secs,
timestamp,
};
(StatusCode::OK, Json(data))
}
#[cfg(all(test, feature = "dashboard"))]
mod tests {
use super::*;
#[test]
fn test_embedded_assets_exist() {
assert!(
DashboardAssets::get("index.html").is_some(),
"index.html should be embedded"
);
}
#[test]
fn test_dashboard_data_serialization() {
let data = DashboardData {
topics: vec![TopicInfo {
name: "test-topic".to_string(),
partitions: 3,
replication_factor: 2,
message_count: 1000,
partition_offsets: vec![
PartitionOffset {
partition: 0,
earliest: 0,
latest: 500,
count: 500,
},
PartitionOffset {
partition: 1,
earliest: 0,
latest: 300,
count: 300,
},
PartitionOffset {
partition: 2,
earliest: 0,
latest: 200,
count: 200,
},
],
}],
consumer_groups: vec![ConsumerGroupInfo {
group_id: "test-group".to_string(),
state: "Stable".to_string(),
member_count: 2,
topics: vec!["test-topic".to_string()],
total_lag: 100,
}],
active_connections: 5,
total_requests: 1000,
uptime_secs: 3600,
timestamp: 1706200000000,
};
let json = serde_json::to_string(&data).unwrap();
assert!(json.contains("test-topic"));
assert!(json.contains("test-group"));
assert!(json.contains("message_count"));
assert!(json.contains("partition_offsets"));
}
}