use std::net::SocketAddr;
use std::sync::Arc;
use crate::core::IndexSummary;
use crate::service::events::{AnalyzerAppState, ApiError};
use anyhow::Result;
use axum::{
extract::State,
http::StatusCode,
response::{IntoResponse, Json, Redirect},
routing::{delete, get, post},
Router,
};
use futures::StreamExt;
use serde::Serialize;
use tokio_stream::wrappers::BroadcastStream;
use super::handlers;
use super::ui;
pub fn build_router(state: AnalyzerAppState) -> Router {
let router = Router::new()
.route("/", get(|| async { Redirect::permanent("/ui/") }))
.route("/health", get(health))
.route("/sse", get(sse_handler))
.route("/indexes", get(list_indexes))
.route(
"/indexes/{id}/complexity_hotspots",
get(handlers::analysis::complexity_hotspots),
)
.route("/indexes/{id}/smells", get(handlers::analysis::smells))
.route(
"/indexes/{id}/refactor-suggestions",
get(handlers::analysis::refactor_suggestions),
)
.route(
"/indexes/{id}/quality",
get(handlers::analysis::quality_report),
)
.route(
"/indexes/{id}/diagnostics",
get(handlers::analysis::diagnostics_for_index),
)
.route("/indexes/{id}/graph", get(handlers::graph::graph_for_index))
.route(
"/indexes/{id}/entities",
get(handlers::graph::entities_for_index),
)
.route(
"/indexes/{id}/clusters",
get(handlers::graph::clusters_for_index),
)
.route("/indexes/{id}/ner", get(handlers::graph::ner_for_index))
.route("/indexes/{id}/scip", post(handlers::graph::ingest_scip))
.route("/review", post(handlers::review::review_diff_handler))
.route(
"/review/github-pr",
post(handlers::review::review_github_pr_handler),
)
.route("/analyze/deep", post(handlers::deep::deep_analyze_handler))
.route(
"/webhooks/github",
post(handlers::review::github_webhook_handler),
)
.route(
"/facts",
get(handlers::facts::list_facts).post(handlers::facts::upsert_fact),
)
.route("/facts/{id}", delete(handlers::facts::delete_fact))
.route("/ui", get(|| async { Redirect::permanent("/ui/") }))
.route("/ui/", get(ui::ui_index_handler))
.route("/ui/{*path}", get(ui::ui_asset_handler))
.with_state(Arc::new(state));
trusty_common::server::with_standard_middleware(router)
}
pub async fn serve(state: AnalyzerAppState, start_port: u16) -> Result<()> {
let start_addr: SocketAddr = ([127, 0, 0, 1], start_port).into();
let listener = trusty_common::bind_with_auto_port(start_addr, 64).await?;
let actual = listener.local_addr()?;
trusty_common::write_daemon_addr("trusty-analyze", &actual.to_string())?;
tracing::info!("trusty-analyze listening on http://{actual}");
let app = build_router(state);
axum::serve(listener, app)
.with_graceful_shutdown(trusty_common::shutdown_signal())
.await?;
if let Ok(dir) = trusty_common::resolve_data_dir("trusty-analyze") {
let _ = std::fs::remove_file(dir.join("http_addr"));
}
Ok(())
}
#[derive(Serialize)]
struct HealthResponse {
status: &'static str,
version: &'static str,
search_reachable: bool,
}
async fn health(
State(state): State<Arc<AnalyzerAppState>>,
) -> Result<Json<HealthResponse>, (StatusCode, Json<HealthResponse>)> {
let search_reachable = state.search.health().await.unwrap_or(false);
let response = HealthResponse {
status: if search_reachable { "ok" } else { "degraded" },
version: env!("CARGO_PKG_VERSION"),
search_reachable,
};
if search_reachable {
Ok(Json(response))
} else {
Err((StatusCode::SERVICE_UNAVAILABLE, Json(response)))
}
}
async fn list_indexes(
State(state): State<Arc<AnalyzerAppState>>,
) -> Result<Json<Vec<IndexSummary>>, ApiError> {
state.search.list_indexes().await.map(Json).map_err(|e| {
tracing::warn!("list_indexes proxy failed: {e:#}");
ApiError::bad_gateway(format!("upstream search daemon: {e:#}"))
})
}
async fn sse_handler(State(state): State<Arc<AnalyzerAppState>>) -> impl IntoResponse {
let rx = state.events.subscribe();
let initial = futures::stream::once(async {
Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(
"data: {\"type\":\"connected\"}\n\n",
))
});
let events = BroadcastStream::new(rx).map(|res| {
let frame = match res {
Ok(event) => match serde_json::to_string(&event) {
Ok(json) => format!("data: {json}\n\n"),
Err(e) => format!("data: {{\"type\":\"error\",\"message\":\"{e}\"}}\n\n"),
},
Err(tokio_stream::wrappers::errors::BroadcastStreamRecvError::Lagged(n)) => {
format!("data: {{\"type\":\"lag\",\"skipped\":{n}}}\n\n")
}
};
Ok::<axum::body::Bytes, std::io::Error>(axum::body::Bytes::from(frame))
});
let stream = initial.chain(events);
axum::response::Response::builder()
.header("Content-Type", "text/event-stream")
.header("Cache-Control", "no-cache")
.header("X-Accel-Buffering", "no")
.body(axum::body::Body::from_stream(stream))
.expect("valid SSE response")
}