use axum::{
extract::{Path, State},
http::StatusCode,
response::{
sse::{Event, KeepAlive, Sse},
Html, IntoResponse, Json,
},
routing::{get, post},
Router,
};
use futures_core::Stream;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::Infallible;
use std::sync::Arc;
use tokio::sync::{broadcast, Mutex};
use std::path::PathBuf;
use crate::audit::AuditLogger;
use crate::config::PhalusConfig;
use crate::manifest::cargo::CargoParser;
use crate::manifest::gomod::GoModParser;
use crate::manifest::npm::NpmParser;
use crate::manifest::pypi::PypiParser;
use crate::pipeline::{PipelineConfig, ProgressEvent};
use crate::ParsedManifest;
#[derive(Debug, Clone, Serialize)]
pub struct JobState {
pub status: String,
pub results: Vec<serde_json::Value>,
}
pub struct AppState {
pub progress_tx: broadcast::Sender<ProgressEvent>,
pub jobs: Mutex<HashMap<String, JobState>>,
}
pub fn router(state: Arc<AppState>) -> Router {
Router::new()
.route("/", get(serve_index))
.route("/api/manifest/parse", post(parse_manifest))
.route("/api/health", get(health))
.route("/api/jobs", post(create_job))
.route("/api/jobs/{id}/stream", get(stream_job))
.route("/api/jobs/{id}/download", get(download_job))
.with_state(state)
}
async fn serve_index() -> Html<&'static str> {
Html(include_str!("static/index.html"))
}
fn try_parse_manifest(body: &str) -> Option<ParsedManifest> {
if let Ok(manifest) = NpmParser::parse(body) {
if !manifest.packages.is_empty() {
return Some(manifest);
}
}
if let Ok(manifest) = PypiParser::parse(body) {
if !manifest.packages.is_empty() {
return Some(manifest);
}
}
if let Ok(manifest) = CargoParser::parse(body) {
if !manifest.packages.is_empty() {
return Some(manifest);
}
}
if let Ok(manifest) = GoModParser::parse(body) {
if !manifest.packages.is_empty() {
return Some(manifest);
}
}
None
}
async fn parse_manifest(State(_state): State<Arc<AppState>>, body: String) -> impl IntoResponse {
match try_parse_manifest(&body) {
Some(manifest) => Json(serde_json::to_value(&manifest).unwrap()).into_response(),
None => (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "could not parse manifest"})),
)
.into_response(),
}
}
async fn health() -> Json<serde_json::Value> {
Json(serde_json::json!({"status": "ok"}))
}
#[derive(Debug, Deserialize)]
struct CreateJobRequest {
manifest_content: String,
license: Option<String>,
isolation: Option<String>,
}
async fn create_job(
State(state): State<Arc<AppState>>,
Json(req): Json<CreateJobRequest>,
) -> impl IntoResponse {
let job_id = uuid::Uuid::new_v4().to_string();
let parsed = match try_parse_manifest(&req.manifest_content) {
Some(p) => p,
None => {
return (
StatusCode::BAD_REQUEST,
Json(serde_json::json!({"error": "could not parse manifest"})),
)
.into_response();
}
};
{
let mut jobs = state.jobs.lock().await;
jobs.insert(
job_id.clone(),
JobState {
status: "running".to_string(),
results: Vec::new(),
},
);
}
let tx = state.progress_tx.clone();
let job_id_clone = job_id.clone();
let license = req.license.unwrap_or_else(|| "mit".to_string());
let isolation = req.isolation.unwrap_or_else(|| "context".to_string());
let state_jobs = Arc::clone(&state) as Arc<AppState>;
tokio::spawn(async move {
tracing::info!(
"Job {} starting with {} packages",
job_id_clone,
parsed.packages.len()
);
let app_config = PhalusConfig::with_env_overrides(PhalusConfig::load().unwrap_or_default());
tracing::info!(
"Job {} config loaded, agent_a_key set: {}",
job_id_clone,
!app_config.llm.agent_a_api_key.is_empty()
);
let pipeline_config = PipelineConfig {
license,
output_dir: PathBuf::from("./phalus-output"),
target_lang: None,
isolation_mode: isolation,
similarity_threshold: 0.70,
concurrency: 3,
dry_run: false,
};
std::fs::create_dir_all(&pipeline_config.output_dir).ok();
let audit_path = pipeline_config.output_dir.join("audit.jsonl");
let audit = Arc::new(tokio::sync::Mutex::new(
AuditLogger::new(audit_path).unwrap(),
));
let mut results = Vec::new();
for pkg in &parsed.packages {
let result = crate::pipeline::run_package(
pkg,
&pipeline_config,
&app_config,
audit.clone(),
Some(tx.clone()),
)
.await;
results.push(serde_json::to_value(&result).unwrap_or_default());
}
let failed = results
.iter()
.filter(|r| !r.get("success").and_then(|v| v.as_bool()).unwrap_or(true))
.count();
let _ = tx.send(ProgressEvent::JobDone {
total: results.len(),
failed,
});
tracing::info!(
"Job {} completed: {} processed, {} failed",
job_id_clone,
results.len(),
failed
);
let mut jobs = state_jobs.jobs.lock().await;
if let Some(job) = jobs.get_mut(&job_id_clone) {
job.status = "completed".to_string();
job.results = results;
}
});
Json(serde_json::json!({"job_id": job_id})).into_response()
}
async fn stream_job(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
{
let jobs = state.jobs.lock().await;
if !jobs.contains_key(&id) {
return (
StatusCode::NOT_FOUND,
Json(serde_json::json!({"error": "job not found"})),
)
.into_response();
}
}
let rx = state.progress_tx.subscribe();
Sse::new(make_event_stream(rx))
.keep_alive(KeepAlive::default())
.into_response()
}
fn make_event_stream(
mut rx: broadcast::Receiver<ProgressEvent>,
) -> impl Stream<Item = Result<Event, Infallible>> {
async_stream::stream! {
loop {
match rx.recv().await {
Ok(event) => {
let data = serde_json::to_string(&event).unwrap_or_default();
yield Ok(Event::default().data(data));
if matches!(event, ProgressEvent::JobDone { .. }) {
break;
}
}
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => break,
}
}
}
}
async fn download_job(
State(state): State<Arc<AppState>>,
Path(id): Path<String>,
) -> impl IntoResponse {
{
let jobs = state.jobs.lock().await;
match jobs.get(&id) {
Some(job) if job.status == "completed" => {}
Some(_) => return (StatusCode::CONFLICT, "job still running").into_response(),
None => return (StatusCode::NOT_FOUND, "job not found").into_response(),
}
}
let output_dir = std::path::PathBuf::from("./phalus-output");
if !output_dir.exists() {
return (StatusCode::NOT_FOUND, "no output directory").into_response();
}
let mut buf = std::io::Cursor::new(Vec::new());
{
let mut zip = zip::ZipWriter::new(&mut buf);
let options = zip::write::SimpleFileOptions::default()
.compression_method(zip::CompressionMethod::Deflated);
add_dir_to_zip(&mut zip, &output_dir, &output_dir, &options).ok();
zip.finish().ok();
}
let bytes = buf.into_inner();
(
StatusCode::OK,
[
("content-type", "application/zip"),
(
"content-disposition",
"attachment; filename=\"phalus-output.zip\"",
),
],
bytes,
)
.into_response()
}
fn add_dir_to_zip(
zip: &mut zip::ZipWriter<&mut std::io::Cursor<Vec<u8>>>,
base: &std::path::Path,
dir: &std::path::Path,
options: &zip::write::SimpleFileOptions,
) -> std::io::Result<()> {
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
let rel = path.strip_prefix(base).unwrap_or(&path);
let name = rel.to_string_lossy().to_string();
if path.is_dir() {
add_dir_to_zip(zip, base, &path, options)?;
} else if let Ok(content) = std::fs::read(&path) {
zip.start_file(name, *options).ok();
use std::io::Write;
zip.write_all(&content).ok();
}
}
Ok(())
}