use axum::{
extract::{ConnectInfo, Query, State},
http::{HeaderMap, StatusCode},
middleware::{self, Next},
response::{Html, IntoResponse},
routing::get,
Json, Router,
};
use serde::Deserialize;
use serde_json::json;
use sqlx::SqlitePool;
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex;
use tracing::{info, warn};
use crate::agent;
use crate::config::ModelsConfig;
use crate::events::{EventStore, WriteConsistencyThresholds};
use crate::health::HealthProbeStore;
use crate::heartbeat::HeartbeatTelemetry;
use crate::oauth::OAuthGateway;
use crate::queue_telemetry::QueueTelemetry;
const DASHBOARD_HTML: &str = include_str!("dashboard.html");
const KEYCHAIN_FIELD: &str = "dashboard_token";
const TOKEN_TTL_SECS: u64 = 86400;
const MAX_FAILED_ATTEMPTS: u32 = 10;
const RATE_LIMIT_WINDOW_SECS: u64 = 900;
#[derive(Clone)]
pub struct DashboardState {
pub pool: SqlitePool,
pub event_store: Option<Arc<EventStore>>,
pub provider_kind: String,
pub models: ModelsConfig,
pub started_at: Instant,
pub dashboard_token: String,
pub token_created_at: Instant,
pub daily_token_budget: Option<u64>,
pub health_store: Option<Arc<HealthProbeStore>>,
pub heartbeat_telemetry: Option<Arc<HeartbeatTelemetry>>,
pub oauth_gateway: Option<OAuthGateway>,
pub policy_uncertainty_threshold: f32,
pub write_consistency_thresholds: WriteConsistencyThresholds,
pub queue_telemetry: Arc<QueueTelemetry>,
pub auth_failures: Arc<Mutex<HashMap<String, (u32, Instant)>>>,
}
pub struct DashboardToken {
pub token: String,
pub created_at: Instant,
}
pub fn get_or_create_dashboard_token() -> anyhow::Result<DashboardToken> {
let tok = uuid::Uuid::new_v4().to_string();
if let Err(e) = crate::config::store_in_keychain(KEYCHAIN_FIELD, &tok) {
warn!("Could not store dashboard token in keychain: {e}");
}
let prefix = tok.get(..8).unwrap_or("????????");
info!(
"Dashboard token created (prefix: {}..., expires in 24h)",
prefix
);
Ok(DashboardToken {
token: tok,
created_at: Instant::now(),
})
}
pub fn build_router(state: DashboardState) -> Router {
let api = Router::new()
.route("/api/status", get(api_status))
.route("/api/usage", get(api_usage))
.route("/api/sessions", get(api_sessions))
.route("/api/tasks", get(api_tasks))
.route("/api/heartbeat/jobs", get(api_heartbeat_jobs))
.route("/api/queues", get(api_queues))
.route("/api/writes/consistency", get(api_writes_consistency))
.route("/api/policy/metrics", get(api_policy_metrics))
.route("/api/health/probes", get(api_health_probes))
.route("/api/health/history", get(api_health_history))
.route("/api/health/summary", get(api_health_summary))
.layer(middleware::from_fn_with_state(
state.clone(),
auth_middleware,
));
Router::new()
.route("/health", get(health_handler))
.route("/oauth/callback", get(oauth_callback_handler))
.route("/", get(index_handler))
.merge(api)
.with_state(state)
}
async fn auth_middleware(
State(state): State<DashboardState>,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
headers: HeaderMap,
request: axum::extract::Request,
next: Next,
) -> Result<impl IntoResponse, StatusCode> {
let client_ip = addr.ip().to_string();
{
let mut failures = state.auth_failures.lock().await;
if let Some((count, first_failure)) = failures.get(&client_ip) {
if first_failure.elapsed().as_secs() < RATE_LIMIT_WINDOW_SECS
&& *count >= MAX_FAILED_ATTEMPTS
{
warn!(ip = %client_ip, "Rate limited: too many failed auth attempts");
return Err(StatusCode::TOO_MANY_REQUESTS);
}
if first_failure.elapsed().as_secs() >= RATE_LIMIT_WINDOW_SECS {
failures.remove(&client_ip);
}
}
}
if state.token_created_at.elapsed().as_secs() > TOKEN_TTL_SECS {
warn!("Dashboard token expired (>24h). Restart the daemon to generate a new token.");
return Err(StatusCode::UNAUTHORIZED);
}
let token = headers
.get("authorization")
.and_then(|v| v.to_str().ok())
.and_then(|v| v.strip_prefix("Bearer "))
.unwrap_or("");
if !constant_time_eq(token.as_bytes(), state.dashboard_token.as_bytes()) {
let mut failures = state.auth_failures.lock().await;
let entry = failures
.entry(client_ip.clone())
.or_insert((0, Instant::now()));
entry.0 += 1;
warn!(ip = %client_ip, attempts = entry.0, "Failed dashboard auth attempt");
return Err(StatusCode::UNAUTHORIZED);
}
Ok(next.run(request).await)
}
async fn health_handler() -> Json<serde_json::Value> {
Json(json!({"status": "ok"}))
}
#[derive(Deserialize)]
struct OAuthCallbackParams {
state: Option<String>,
code: Option<String>,
error: Option<String>,
}
async fn oauth_callback_handler(
State(state): State<DashboardState>,
Query(params): Query<OAuthCallbackParams>,
) -> Html<String> {
let gateway = match &state.oauth_gateway {
Some(g) => g,
None => {
return Html("<html><body><h2>OAuth not enabled</h2><p>OAuth is not configured on this daemon.</p></body></html>".to_string());
}
};
let state_param = match ¶ms.state {
Some(s) => s.as_str(),
None => {
return Html(
"<html><body><h2>Error</h2><p>Missing state parameter.</p></body></html>"
.to_string(),
);
}
};
match gateway
.handle_callback(state_param, params.code.as_deref(), params.error.as_deref())
.await
{
Ok(msg) => {
let safe_msg = html_escape(&msg);
let is_error =
msg.contains("denied") || msg.contains("failed") || msg.contains("expired");
let (icon, title) = if is_error {
("❌", "OAuth Error")
} else {
("✅", "Connected!")
};
Html(format!(
"<html><head><title>{title}</title></head>\
<body style=\"font-family:sans-serif;text-align:center;padding:60px\">\
<h1>{icon}</h1><h2>{title}</h2><p>{safe_msg}</p>\
<p style=\"color:#888\">You can close this tab and return to your chat.</p>\
</body></html>"
))
}
Err(e) => {
warn!("OAuth callback error: {}", e);
let safe_err = html_escape(&e.to_string());
Html(format!(
"<html><head><title>OAuth Error</title></head>\
<body style=\"font-family:sans-serif;text-align:center;padding:60px\">\
<h1>❌</h1><h2>OAuth Error</h2><p>{safe_err}</p>\
<p style=\"color:#888\">Please try again.</p>\
</body></html>"
))
}
}
}
async fn index_handler() -> Html<&'static str> {
Html(DASHBOARD_HTML)
}
async fn api_status(State(state): State<DashboardState>) -> Json<serde_json::Value> {
let uptime = state.started_at.elapsed().as_secs();
Json(json!({
"provider": state.provider_kind,
"models": {
"default": state.models.default_model,
"fallback": state.models.fallback_models,
"primary": state.models.primary,
"fast": state.models.fast,
"smart": state.models.smart,
},
"uptime_secs": uptime,
"version": env!("CARGO_PKG_VERSION"),
"daily_token_budget": state.daily_token_budget,
}))
}
#[derive(Deserialize)]
struct UsageQuery {
#[serde(default = "default_usage_days")]
days: u32,
}
fn default_usage_days() -> u32 {
7
}
fn constant_time_eq(a: &[u8], b: &[u8]) -> bool {
if a.len() != b.len() {
return false;
}
let mut diff = 0u8;
for (x, y) in a.iter().zip(b.iter()) {
diff |= x ^ y;
}
diff == 0
}
fn html_escape(s: &str) -> String {
s.replace('&', "&")
.replace('<', "<")
.replace('>', ">")
.replace('"', """)
.replace('\'', "'")
}
async fn api_usage(
State(state): State<DashboardState>,
Query(q): Query<UsageQuery>,
) -> Json<serde_json::Value> {
let days = q.days.min(90);
let rows = sqlx::query_as::<_, UsageRow>(
"SELECT date(created_at) as day, model, \
SUM(input_tokens) as input_tokens, SUM(output_tokens) as output_tokens, \
COUNT(*) as request_count \
FROM token_usage \
WHERE created_at >= datetime('now', '-' || ? || ' days') \
GROUP BY day, model ORDER BY day DESC, model",
)
.bind(days)
.fetch_all(&state.pool)
.await
.unwrap_or_default();
let vals: Vec<serde_json::Value> = rows
.into_iter()
.map(|r| {
json!({
"day": r.day,
"model": r.model,
"input_tokens": r.input_tokens,
"output_tokens": r.output_tokens,
"request_count": r.request_count,
})
})
.collect();
Json(serde_json::Value::Array(vals))
}
#[derive(sqlx::FromRow)]
struct UsageRow {
day: Option<String>,
model: String,
input_tokens: i64,
output_tokens: i64,
request_count: i64,
}
#[derive(Deserialize)]
struct SessionsQuery {
#[serde(default = "default_sessions_limit")]
limit: u32,
}
fn default_sessions_limit() -> u32 {
20
}
async fn api_sessions(
State(state): State<DashboardState>,
Query(q): Query<SessionsQuery>,
) -> Json<serde_json::Value> {
let limit = q.limit.min(100);
let rows = sqlx::query_as::<_, SessionRow>(
"SELECT session_id, MAX(created_at) as last_activity, \
COUNT(*) as message_count, MIN(created_at) as first_message \
FROM events \
WHERE event_type IN ('user_message', 'assistant_response', 'tool_result') \
GROUP BY session_id \
ORDER BY last_activity DESC LIMIT ?",
)
.bind(limit)
.fetch_all(&state.pool)
.await
.unwrap_or_default();
let vals: Vec<serde_json::Value> = rows
.into_iter()
.map(|r| {
json!({
"session_id": r.session_id,
"last_activity": r.last_activity,
"message_count": r.message_count,
"first_message": r.first_message,
})
})
.collect();
Json(serde_json::Value::Array(vals))
}
#[derive(sqlx::FromRow)]
struct SessionRow {
session_id: String,
last_activity: Option<String>,
message_count: i64,
first_message: Option<String>,
}
async fn api_tasks(State(state): State<DashboardState>) -> Json<serde_json::Value> {
let rows = sqlx::query_as::<_, ScheduleRow>(
"SELECT
s.id AS schedule_id,
s.goal_id AS goal_id,
g.description AS goal_description,
g.status AS goal_status,
g.domain AS goal_domain,
s.original_schedule AS original_schedule,
s.cron_expr AS cron_expr,
s.fire_policy AS fire_policy,
s.is_one_shot AS is_one_shot,
s.is_paused AS is_paused,
s.last_run_at AS last_run_at,
s.next_run_at AS next_run_at
FROM goal_schedules s
JOIN goals g ON g.id = s.goal_id
ORDER BY s.next_run_at ASC",
)
.fetch_all(&state.pool)
.await
.unwrap_or_default();
let vals: Vec<serde_json::Value> = rows
.into_iter()
.map(|r| {
json!({
"schedule_id": r.schedule_id,
"goal_id": r.goal_id,
"goal_description": r.goal_description,
"goal_status": r.goal_status,
"goal_domain": r.goal_domain,
"original_schedule": r.original_schedule,
"cron_expr": r.cron_expr,
"fire_policy": r.fire_policy,
"is_one_shot": r.is_one_shot != 0,
"is_paused": r.is_paused != 0,
"last_run_at": r.last_run_at,
"next_run_at": r.next_run_at,
})
})
.collect();
Json(serde_json::Value::Array(vals))
}
#[derive(sqlx::FromRow)]
struct ScheduleRow {
schedule_id: String,
goal_id: String,
goal_description: String,
goal_status: String,
goal_domain: String,
original_schedule: Option<String>,
cron_expr: String,
fire_policy: String,
is_one_shot: i64,
is_paused: i64,
last_run_at: Option<String>,
next_run_at: String,
}
async fn api_heartbeat_jobs(State(state): State<DashboardState>) -> Json<serde_json::Value> {
let Some(telemetry) = &state.heartbeat_telemetry else {
return Json(json!({
"error": "Heartbeat telemetry unavailable",
"jobs": [],
"maintenance_jobs": []
}));
};
let jobs = telemetry.snapshots();
let maintenance_names = [
"embeddings",
"consolidation",
"memory_decay",
"event_pruning",
"event_task_reconciliation",
"retention_cleanup",
];
let maintenance_jobs: Vec<_> = jobs
.iter()
.filter(|job| maintenance_names.contains(&job.name.as_str()))
.cloned()
.collect();
Json(json!({
"jobs": jobs,
"maintenance_jobs": maintenance_jobs
}))
}
async fn api_queues(State(state): State<DashboardState>) -> Json<serde_json::Value> {
let queues = state.queue_telemetry.snapshots();
Json(json!({ "queues": queues }))
}
async fn api_writes_consistency(State(state): State<DashboardState>) -> Json<serde_json::Value> {
let Some(store) = &state.event_store else {
return Json(json!({
"error": "Event store unavailable"
}));
};
match store.write_consistency_report(10).await {
Ok(report) => {
let gate = report.evaluate_gate_with(state.write_consistency_thresholds);
Json(json!({
"generated_at": report.generated_at,
"conversation_event_rows": report.conversation_event_rows,
"missing_message_id_events": report.missing_message_id_events,
"global_delta": report.global_delta,
"session_mismatch_count": report.session_mismatch_count,
"stale_task_starts": report.stale_task_starts,
"top_session_drifts": report.top_session_drifts,
"gate": gate
}))
}
Err(e) => {
warn!(error = %e, "Failed to build write consistency report");
Json(json!({
"error": format!("failed to generate report: {}", e)
}))
}
}
}
async fn api_policy_metrics(State(state): State<DashboardState>) -> Json<serde_json::Value> {
let metrics = agent::policy_metrics_snapshot();
let autotune = agent::policy_autotune_snapshot(state.policy_uncertainty_threshold);
let avg_tools_before = if metrics.tool_exposure_samples > 0 {
metrics.tool_exposure_before_sum as f64 / metrics.tool_exposure_samples as f64
} else {
0.0
};
let avg_tools_after = if metrics.tool_exposure_samples > 0 {
metrics.tool_exposure_after_sum as f64 / metrics.tool_exposure_samples as f64
} else {
0.0
};
Json(json!({
"tool_exposure_samples": metrics.tool_exposure_samples,
"avg_tools_before_filter": avg_tools_before,
"avg_tools_after_filter": avg_tools_after,
"ambiguity_detected_total": metrics.ambiguity_detected_total,
"uncertainty_clarify_total": metrics.uncertainty_clarify_total,
"uncertainty_threshold": autotune.uncertainty_threshold,
"context_refresh_total": metrics.context_refresh_total,
"escalation_total": metrics.escalation_total,
"fallback_expansion_total": metrics.fallback_expansion_total,
"response_direct_return_total": metrics.response_direct_return_total,
"response_fallthrough_total": metrics.response_fallthrough_total,
"orchestration_route_clarification_required_total": metrics.orchestration_route_clarification_required_total,
"orchestration_route_tools_required_total": metrics.orchestration_route_tools_required_total,
"orchestration_route_short_correction_direct_reply_total": metrics.orchestration_route_short_correction_direct_reply_total,
"orchestration_route_acknowledgment_direct_reply_total": metrics.orchestration_route_acknowledgment_direct_reply_total,
"orchestration_route_default_continue_total": metrics.orchestration_route_default_continue_total,
"tool_schema_contract_rejections_total": metrics.tool_schema_contract_rejections_total,
"route_drift_alert_total": metrics.route_drift_alert_total,
"route_drift_failsafe_activation_total": metrics.route_drift_failsafe_activation_total,
"route_failsafe_active_turn_total": metrics.route_failsafe_active_turn_total,
"tokens_failed_tasks_total": metrics.tokens_failed_tasks_total,
"no_progress_iterations_total": metrics.no_progress_iterations_total,
"deferred_no_tool_forced_required_total": metrics.deferred_no_tool_forced_required_total,
"deferred_no_tool_deferral_detected_total": metrics.deferred_no_tool_deferral_detected_total,
"deferred_no_tool_model_switch_total": metrics.deferred_no_tool_model_switch_total,
"deferred_no_tool_error_marker_total": metrics.deferred_no_tool_error_marker_total,
"llm_payload_invalid_total": metrics.llm_payload_invalid_total,
"llm_payload_invalid_breakdown": metrics.llm_payload_invalid_breakdown
}))
}
async fn api_health_probes(State(state): State<DashboardState>) -> Json<serde_json::Value> {
let Some(store) = &state.health_store else {
return Json(json!({
"error": "Health probes not enabled",
"probes": []
}));
};
let probes = store.list_probes().await.unwrap_or_default();
let mut probe_data = Vec::new();
for probe in probes {
let latest = store.get_latest_result(&probe.id).await.unwrap_or(None);
let consecutive_failures = store
.count_consecutive_failures(&probe.id)
.await
.unwrap_or(0);
probe_data.push(json!({
"id": probe.id,
"name": probe.name,
"description": probe.description,
"type": probe.probe_type.as_str(),
"target": probe.target,
"schedule": probe.schedule,
"source": probe.source,
"is_paused": probe.is_paused,
"consecutive_failures_alert": probe.consecutive_failures_alert,
"latency_threshold_ms": probe.latency_threshold_ms,
"last_run_at": probe.last_run_at.map(|t| t.to_rfc3339()),
"next_run_at": probe.next_run_at.to_rfc3339(),
"last_status": latest.as_ref().map(|r| r.status.as_str()),
"last_latency_ms": latest.as_ref().and_then(|r| r.latency_ms),
"last_checked": latest.as_ref().map(|r| r.checked_at.to_rfc3339()),
"consecutive_failures": consecutive_failures,
}));
}
Json(serde_json::Value::Array(probe_data))
}
#[derive(Deserialize)]
struct HealthHistoryQuery {
probe: String,
#[serde(default = "default_history_hours")]
hours: u32,
}
fn default_history_hours() -> u32 {
24
}
async fn api_health_history(
State(state): State<DashboardState>,
Query(q): Query<HealthHistoryQuery>,
) -> Json<serde_json::Value> {
let Some(store) = &state.health_store else {
return Json(json!({
"error": "Health probes not enabled",
"results": []
}));
};
let probe = store.get_probe(&q.probe).await.ok().flatten().or({
None
});
let probe_id = if let Some(p) = probe {
p.id
} else {
match store.get_probe_by_name(&q.probe).await {
Ok(Some(p)) => p.id,
_ => {
return Json(json!({
"error": format!("Probe not found: {}", q.probe),
"results": []
}));
}
}
};
let hours = q.hours.min(168); let end = chrono::Utc::now();
let start = end - chrono::Duration::hours(hours as i64);
let results = store
.get_results_in_range(&probe_id, start, end)
.await
.unwrap_or_default();
let result_data: Vec<serde_json::Value> = results
.iter()
.map(|r| {
json!({
"id": r.id,
"status": r.status.as_str(),
"latency_ms": r.latency_ms,
"error_message": r.error_message,
"checked_at": r.checked_at.to_rfc3339(),
})
})
.collect();
Json(json!({
"probe_id": probe_id,
"hours": hours,
"count": result_data.len(),
"results": result_data
}))
}
async fn api_health_summary(State(state): State<DashboardState>) -> Json<serde_json::Value> {
let Some(store) = &state.health_store else {
return Json(json!({
"error": "Health probes not enabled",
"total": 0,
"healthy": 0,
"unhealthy": 0,
"paused": 0,
"probes": []
}));
};
let probes = store.list_probes().await.unwrap_or_default();
let stats_map = store.get_all_probe_stats(24).await.unwrap_or_default();
let mut total = 0u32;
let mut healthy = 0u32;
let mut unhealthy = 0u32;
let mut paused = 0u32;
let mut probe_summaries = Vec::new();
for probe in probes {
total += 1;
if probe.is_paused {
paused += 1;
probe_summaries.push(json!({
"id": probe.id,
"name": probe.name,
"status": "paused",
"health_score": null,
}));
continue;
}
let latest = store.get_latest_result(&probe.id).await.unwrap_or(None);
let is_healthy = latest
.as_ref()
.map(|r| r.status.is_healthy())
.unwrap_or(true);
if is_healthy {
healthy += 1;
} else {
unhealthy += 1;
}
let stats = stats_map.get(&probe.id);
let health_score = stats.map(crate::health::TrendAnalyzer::health_score);
let uptime = stats.map(|s| s.uptime_percent);
let avg_latency = stats.and_then(|s| s.avg_latency_ms);
probe_summaries.push(json!({
"id": probe.id,
"name": probe.name,
"status": if is_healthy { "healthy" } else { "unhealthy" },
"health_score": health_score,
"uptime_percent": uptime,
"avg_latency_ms": avg_latency,
}));
}
let overall_score = if total > paused {
let active = total - paused;
((healthy as f64 / active as f64) * 100.0) as u32
} else {
100 };
Json(json!({
"total": total,
"healthy": healthy,
"unhealthy": unhealthy,
"paused": paused,
"overall_health_score": overall_score,
"probes": probe_summaries
}))
}
pub async fn start_dashboard_server(
state: DashboardState,
port: u16,
bind_addr: &str,
) -> anyhow::Result<()> {
let app = build_router(state).into_make_service_with_connect_info::<SocketAddr>();
let ip: std::net::IpAddr = bind_addr
.parse()
.unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
let addr = std::net::SocketAddr::new(ip, port);
info!("Dashboard server listening on http://{}", addr);
let listener = tokio::net::TcpListener::bind(addr).await?;
axum::serve(listener, app).await?;
Ok(())
}