ubermind-daemon 0.6.7

Daemon process supervisor for ubermind - v2 alpha with native supervision
use crate::supervisor::Supervisor;
use axum::extract::ws::{Message, WebSocket, WebSocketUpgrade};
use axum::extract::{Path, State};
use axum::http::{header, StatusCode, Uri};
use axum::response::{IntoResponse, Response};
use axum::routing::{get, post};
use axum::{Json, Router};
use rust_embed::RustEmbed;
use serde::Serialize;
use std::collections::HashMap;
use std::sync::Arc;
use tower_http::cors::CorsLayer;

#[derive(RustEmbed)]
#[folder = "../../ui/build/"]
struct UiAssets;

#[derive(Clone)]
pub struct AppState {
	pub supervisor: Arc<Supervisor>,
}

pub fn router(supervisor: Arc<Supervisor>) -> Router {
	let state = AppState { supervisor };

	Router::new()
		.route("/api/services", get(list_services))
		.route("/api/services/{name}", get(service_detail))
		.route("/api/services/{name}/start", post(start_service))
		.route("/api/services/{name}/stop", post(stop_service))
		.route("/api/services/{name}/reload", post(reload_service))
		.route(
			"/api/services/{name}/processes/{process}/restart",
			post(restart_process),
		)
		.route(
			"/api/services/{name}/processes/{process}/kill",
			post(kill_process),
		)
		.route("/api/services/{name}/echo", get(echo_service))
		.route("/ws/echo/{name}", get(ws_echo))
		.fallback(static_handler)
		.layer(CorsLayer::permissive())
		.with_state(state)
}

#[derive(Serialize)]
struct ServiceInfo {
	name: String,
	dir: String,
	running: bool,
}

#[derive(Serialize)]
struct ServiceDetail {
	name: String,
	dir: String,
	running: bool,
	processes: Vec<ProcessInfo>,
}

#[derive(Serialize)]
struct ProcessInfo {
	name: String,
	pid: Option<u32>,
	status: String,
	autostart: bool,
	ports: Vec<u16>,
}

#[derive(Serialize)]
struct ActionResponse {
	message: String,
}

#[derive(Serialize)]
struct ErrorResponse {
	error: String,
}

async fn list_services(State(state): State<AppState>) -> Json<Vec<ServiceInfo>> {
	let statuses = state.supervisor.status().await;
	let services = statuses
		.iter()
		.map(|s| ServiceInfo {
			name: s.name.clone(),
			dir: s.dir.to_string_lossy().to_string(),
			running: s.is_running(),
		})
		.collect();
	Json(services)
}

async fn service_detail(
	State(state): State<AppState>,
	Path(name): Path<String>,
) -> Result<Json<ServiceDetail>, (StatusCode, Json<ErrorResponse>)> {
	let statuses = state.supervisor.status().await;
	let status = statuses
		.into_iter()
		.find(|s| s.name == name)
		.ok_or_else(|| {
			(
				StatusCode::NOT_FOUND,
				Json(ErrorResponse {
					error: format!("service not found: {}", name),
				}),
			)
		})?;

	let running = status.is_running();
	let processes = status
		.processes
		.into_iter()
		.map(|p| {
			let status_str = match &p.state {
				ubermind_core::types::ProcessState::Running { pid, uptime_secs } => {
					format!("running (pid {}, {}s)", pid, uptime_secs)
				}
				ubermind_core::types::ProcessState::Stopped => "stopped".to_string(),
				ubermind_core::types::ProcessState::Crashed { exit_code, retries } => {
					format!("crashed (exit {}, retry {})", exit_code, retries)
				}
				ubermind_core::types::ProcessState::Failed { exit_code } => {
					format!("failed (exit {})", exit_code)
				}
			};
			ProcessInfo {
				name: p.name,
				pid: p.pid,
				status: status_str,
				autostart: p.autostart,
				ports: p.ports,
			}
		})
		.collect();

	Ok(Json(ServiceDetail {
		name: status.name,
		dir: status.dir.to_string_lossy().to_string(),
		running,
		processes,
	}))
}

async fn start_service(
	State(state): State<AppState>,
	Path(name): Path<String>,
	axum::extract::Query(params): axum::extract::Query<HashMap<String, String>>,
) -> Result<Json<ActionResponse>, (StatusCode, Json<ErrorResponse>)> {
	let all = params.get("all").map(|v| v == "true" || v == "1").unwrap_or(false);
	state
		.supervisor
		.start_service_filtered(&name, all, &[])
		.await
		.map(|msg| Json(ActionResponse { message: msg }))
		.map_err(|e| {
			(
				StatusCode::BAD_REQUEST,
				Json(ErrorResponse { error: e }),
			)
		})
}

async fn stop_service(
	State(state): State<AppState>,
	Path(name): Path<String>,
) -> Result<Json<ActionResponse>, (StatusCode, Json<ErrorResponse>)> {
	state
		.supervisor
		.stop_service(&name)
		.await
		.map(|msg| Json(ActionResponse { message: msg }))
		.map_err(|e| {
			(
				StatusCode::BAD_REQUEST,
				Json(ErrorResponse { error: e }),
			)
		})
}

async fn reload_service(
	State(state): State<AppState>,
	Path(name): Path<String>,
) -> Result<Json<ActionResponse>, (StatusCode, Json<ErrorResponse>)> {
	state
		.supervisor
		.reload_service_filtered(&name, false, &[])
		.await
		.map(|msg| Json(ActionResponse { message: msg }))
		.map_err(|e| {
			(
				StatusCode::BAD_REQUEST,
				Json(ErrorResponse { error: e }),
			)
		})
}

async fn restart_process(
	State(state): State<AppState>,
	Path((name, process)): Path<(String, String)>,
) -> Result<Json<ActionResponse>, (StatusCode, Json<ErrorResponse>)> {
	state
		.supervisor
		.restart_process(&name, &process)
		.await
		.map(|msg| Json(ActionResponse { message: msg }))
		.map_err(|e| {
			(
				StatusCode::BAD_REQUEST,
				Json(ErrorResponse { error: e }),
			)
		})
}

async fn kill_process(
	State(state): State<AppState>,
	Path((name, process)): Path<(String, String)>,
) -> Result<Json<ActionResponse>, (StatusCode, Json<ErrorResponse>)> {
	state
		.supervisor
		.kill_process(&name, &process)
		.await
		.map(|msg| Json(ActionResponse { message: msg }))
		.map_err(|e| {
			(
				StatusCode::BAD_REQUEST,
				Json(ErrorResponse { error: e }),
			)
		})
}

async fn echo_service(
	State(state): State<AppState>,
	Path(name): Path<String>,
) -> Result<String, (StatusCode, Json<ErrorResponse>)> {
	let outputs = state.supervisor.get_all_outputs(&name).await.map_err(|e| {
		(
			StatusCode::NOT_FOUND,
			Json(ErrorResponse { error: e }),
		)
	})?;

	let mut result = String::new();
	for (proc_name, capture) in outputs {
		if !result.is_empty() {
			result.push_str(&format!("\n--- {} ---\n", proc_name));
		}
		let snapshot = capture.snapshot().await;
		result.push_str(&String::from_utf8_lossy(&snapshot));
	}
	Ok(result)
}

async fn ws_echo(
	State(state): State<AppState>,
	Path(name): Path<String>,
	ws: WebSocketUpgrade,
) -> impl IntoResponse {
	ws.on_upgrade(move |socket| handle_ws_echo(socket, state, name))
}

async fn handle_ws_echo(mut socket: WebSocket, state: AppState, name: String) {
	let outputs = match state.supervisor.get_all_outputs(&name).await {
		Ok(o) => o,
		Err(_) => {
			return;
		}
	};

	// Send initial snapshots
	for (proc_name, capture) in &outputs {
		let snapshot = capture.snapshot().await;
		if !snapshot.is_empty() {
			let header = format!("\x1b[1m--- {} ---\x1b[0m\r\n", proc_name);
			let mut data = header.into_bytes();
			data.extend_from_slice(&snapshot);
			let _ = socket.send(Message::Binary(data.into())).await;
		}
	}

	// Subscribe to all outputs and forward
	let mut receivers: Vec<(String, tokio::sync::broadcast::Receiver<Vec<u8>>)> = outputs
		.iter()
		.map(|(name, capture)| (name.clone(), capture.subscribe()))
		.collect();

	loop {
		// Poll all receivers
		let mut any = false;
		for (_proc_name, rx) in &mut receivers {
			match rx.try_recv() {
				Ok(data) => {
					any = true;
					let _ = socket.send(Message::Binary(data.into())).await;
				}
				Err(tokio::sync::broadcast::error::TryRecvError::Lagged(_)) => {}
				Err(tokio::sync::broadcast::error::TryRecvError::Empty) => {}
				Err(tokio::sync::broadcast::error::TryRecvError::Closed) => {}
			}
		}
		if !any {
			tokio::time::sleep(std::time::Duration::from_millis(50)).await;
		}
	}
}

async fn static_handler(uri: Uri) -> impl IntoResponse {
	let path = uri.path().trim_start_matches('/');
	
	// Try exact path first
	if let Some(content) = UiAssets::get(path) {
		return serve_asset(path, content);
	}
	
	// For SPA routing, serve index.html for any non-asset path
	if !path.starts_with("_app/") && !path.contains('.') {
		if let Some(content) = UiAssets::get("index.html") {
			return serve_asset("index.html", content);
		}
	}
	
	// 404
	Response::builder()
		.status(StatusCode::NOT_FOUND)
		.body("Not Found".into())
		.unwrap()
}

fn serve_asset(path: &str, content: rust_embed::EmbeddedFile) -> Response {
	let mime = mime_guess::from_path(path).first_or_octet_stream();
	
	Response::builder()
		.status(StatusCode::OK)
		.header(header::CONTENT_TYPE, mime.as_ref())
		.body(content.data.into())
		.unwrap()
}