#![allow(clippy::duplicated_attributes)]
#![cfg(feature = "serve")]
use axum::{
extract::{DefaultBodyLimit, Path, Query, State},
http::{header, StatusCode, Uri},
response::{sse, IntoResponse, Sse},
routing::{get, post},
Json, Router,
};
use futures::stream::StreamExt;
use parking_lot::Mutex;
use rust_embed::Embed;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tower_http::cors::CorsLayer;
use crate::http_types::SubgraphQuery;
use crate::server::{dispatch_tool, tool_schemas, McpConfig};
use crate::session::{ApplyBatchProgressSink, SessionState};
fn append_event_to_log(path: &std::path::Path, event: &SseEvent) {
use std::io::Write;
if let Ok(mut f) = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)
{
if let Ok(line) = serde_json::to_string(event) {
let _ = writeln!(f, "{}", line);
}
}
}
fn emit_followup_events(
event_tx: &broadcast::Sender<SseEvent>,
event_log_path: Option<&std::path::PathBuf>,
tool_name: &str,
source: &str,
agent_id: &str,
output: &serde_json::Value,
) {
if tool_name != "apply_batch" {
return;
}
let Some(progress_events) = output.get("progress_events").and_then(|v| v.as_array()) else {
return;
};
for progress_event in progress_events {
let sse_event = SseEvent {
event_type: "apply_batch_progress".to_string(),
data: serde_json::json!({
"tool": tool_name,
"source": source,
"agent_id": agent_id,
"batch_id": progress_event.get("batch_id").cloned().unwrap_or(serde_json::Value::Null),
"progress": progress_event,
"timestamp_ms": now_ms(),
}),
};
let _ = event_tx.send(sse_event.clone());
if let Some(log_path) = event_log_path {
append_event_to_log(log_path, &sse_event);
}
}
emit_apply_batch_handoff(event_tx, event_log_path, source, agent_id, output);
}
fn tool_result_summary(tool_name: &str, output: &serde_json::Value) -> serde_json::Value {
if tool_name != "apply_batch" {
return truncate_json(output, 500);
}
serde_json::json!({
"batch_id": output.get("batch_id").cloned().unwrap_or(serde_json::Value::Null),
"proof_state": output.get("proof_state").cloned().unwrap_or(serde_json::Value::Null),
"active_phase": output.get("active_phase").cloned().unwrap_or(serde_json::Value::Null),
"progress_pct": output.get("progress_pct").cloned().unwrap_or(serde_json::Value::Null),
"next_suggested_tool": output.get("next_suggested_tool").cloned().unwrap_or(serde_json::Value::Null),
"next_suggested_target": output.get("next_suggested_target").cloned().unwrap_or(serde_json::Value::Null),
"next_step_hint": output.get("next_step_hint").cloned().unwrap_or(serde_json::Value::Null),
"verification_verdict": output
.get("verification")
.and_then(|value| value.get("verdict"))
.cloned()
.unwrap_or(serde_json::Value::Null),
"progress_event_count": output
.get("progress_events")
.and_then(|value| value.as_array())
.map(|value| value.len())
.unwrap_or(0),
})
}
fn emit_apply_batch_handoff(
event_tx: &broadcast::Sender<SseEvent>,
event_log_path: Option<&std::path::PathBuf>,
source: &str,
agent_id: &str,
output: &serde_json::Value,
) {
let batch_id = output
.get("batch_id")
.cloned()
.unwrap_or(serde_json::Value::Null);
let proof_state = output
.get("proof_state")
.cloned()
.unwrap_or(serde_json::Value::Null);
let next_suggested_tool = output
.get("next_suggested_tool")
.cloned()
.unwrap_or(serde_json::Value::Null);
let next_suggested_target = output
.get("next_suggested_target")
.cloned()
.unwrap_or(serde_json::Value::Null);
let next_step_hint = output
.get("next_step_hint")
.cloned()
.unwrap_or(serde_json::Value::Null);
if batch_id.is_null()
&& proof_state.is_null()
&& next_suggested_tool.is_null()
&& next_suggested_target.is_null()
&& next_step_hint.is_null()
{
return;
}
let sse_event = SseEvent {
event_type: "apply_batch_handoff".to_string(),
data: serde_json::json!({
"tool": "apply_batch",
"source": source,
"agent_id": agent_id,
"batch_id": batch_id,
"proof_state": proof_state,
"next_suggested_tool": next_suggested_tool,
"next_suggested_target": next_suggested_target,
"next_step_hint": next_step_hint,
"timestamp_ms": now_ms(),
}),
};
let _ = event_tx.send(sse_event.clone());
if let Some(log_path) = event_log_path {
append_event_to_log(log_path, &sse_event);
}
}
fn apply_batch_progress_sink(
event_tx: broadcast::Sender<SseEvent>,
event_log_path: Option<std::path::PathBuf>,
source: String,
agent_id: String,
) -> ApplyBatchProgressSink {
Arc::new(move |progress_event| {
let sse_event = SseEvent {
event_type: "apply_batch_progress".to_string(),
data: serde_json::json!({
"tool": "apply_batch",
"source": source,
"agent_id": agent_id,
"batch_id": progress_event.batch_id,
"progress": progress_event,
"timestamp_ms": now_ms(),
}),
};
let _ = event_tx.send(sse_event.clone());
if let Some(ref log_path) = event_log_path {
append_event_to_log(log_path, &sse_event);
}
})
}
async fn watch_event_log(path: std::path::PathBuf, tx: broadcast::Sender<SseEvent>) {
use tokio::io::AsyncBufReadExt;
loop {
if path.exists() {
break;
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
let file = match tokio::fs::File::open(&path).await {
Ok(f) => f,
Err(e) => {
eprintln!("[m1nd-mcp] Failed to open event log for watching: {}", e);
return;
}
};
let mut reader = tokio::io::BufReader::new(file);
{
let mut discard = String::new();
loop {
discard.clear();
match reader.read_line(&mut discard).await {
Ok(0) => break, Ok(_) => continue,
Err(_) => break,
}
}
}
eprintln!(
"[m1nd-mcp] Watching event log: {} (tailing new events)",
path.display()
);
let mut line = String::new();
loop {
line.clear();
match reader.read_line(&mut line).await {
Ok(0) => {
tokio::time::sleep(Duration::from_millis(100)).await;
}
Ok(_) => {
let trimmed = line.trim();
if !trimmed.is_empty() {
if let Ok(event) = serde_json::from_str::<SseEvent>(trimmed) {
let _ = tx.send(event);
}
}
}
Err(e) => {
eprintln!("[m1nd-mcp] Event log read error: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
#[derive(Embed)]
#[folder = "../m1nd-ui/dist/"]
#[prefix = ""]
struct UiAssets;
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct SseEvent {
pub event_type: String,
pub data: serde_json::Value,
}
pub struct AppState {
pub session: Arc<Mutex<SessionState>>,
pub tool_schemas_cache: serde_json::Value,
pub event_tx: broadcast::Sender<SseEvent>,
pub event_log_path: Option<std::path::PathBuf>,
}
const TOOL_TIMEOUT_SECS: u64 = 120;
pub fn spawn_background(
session: Arc<Mutex<SessionState>>,
port: u16,
) -> tokio::task::JoinHandle<()> {
let schemas_full = tool_schemas();
let tool_schemas_cache = schemas_full
.get("tools")
.cloned()
.unwrap_or(serde_json::Value::Array(vec![]));
let (event_tx, _) = broadcast::channel::<SseEvent>(64);
let app_state = Arc::new(AppState {
session,
tool_schemas_cache,
event_tx,
event_log_path: None,
});
let router = build_router(app_state, false);
let addr: std::net::SocketAddr = format!("127.0.0.1:{}", port)
.parse()
.expect("valid socket addr");
tokio::spawn(async move {
match tokio::net::TcpListener::bind(addr).await {
Ok(listener) => {
eprintln!("[m1nd-mcp] m1nd GUI: http://localhost:{}", port);
let open_port = port;
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(400)).await;
let url = format!("http://localhost:{}", open_port);
let _ = open_browser(&url);
});
let _ = axum::serve(listener, router).await;
}
Err(e) => {
eprintln!(
"[m1nd-mcp] Background HTTP server failed to bind to {}: {} (GUI unavailable)",
addr, e
);
}
}
})
}
#[allow(clippy::too_many_arguments)]
pub async fn run(
config: McpConfig,
port: u16,
bind: String,
dev_mode: bool,
auto_open: bool,
also_stdio: bool,
event_log: Option<String>,
watch_events: Option<String>,
) {
if bind == "0.0.0.0" {
eprintln!("[m1nd-mcp] WARNING: Binding to 0.0.0.0 exposes the server to the network.");
eprintln!("[m1nd-mcp] WARNING: No authentication is configured. Anyone on the network can access the API.");
}
let server = match crate::server::McpServer::new(config.clone()) {
Ok(s) => s,
Err(e) => {
eprintln!("[m1nd-mcp] Failed to create server: {}", e);
std::process::exit(1);
}
};
let session_state = server.into_session_state();
let session = Arc::new(Mutex::new(session_state));
let schemas_full = tool_schemas();
let tool_schemas_cache = schemas_full
.get("tools")
.cloned()
.unwrap_or(serde_json::Value::Array(vec![]));
let (event_tx, _) = broadcast::channel::<SseEvent>(64);
let event_log_path = event_log.map(std::path::PathBuf::from);
let app_state = Arc::new(AppState {
session: session.clone(),
tool_schemas_cache,
event_tx: event_tx.clone(),
event_log_path: event_log_path.clone(),
});
if let Some(ref watch_path) = watch_events {
let path = std::path::PathBuf::from(watch_path);
let tx = event_tx.clone();
tokio::spawn(watch_event_log(path, tx));
}
let router = build_router(app_state, dev_mode);
if also_stdio {
let stdio_session = session.clone();
let stdio_event_tx = event_tx.clone();
let stdio_event_log = event_log_path.clone();
tokio::task::spawn_blocking(move || {
eprintln!("[m1nd-mcp] Stdio JSON-RPC also active (--stdio). SSE cross-process bridge enabled.");
let stdin = std::io::stdin();
let stdout = std::io::stdout();
let mut reader = stdin.lock();
let mut writer = stdout.lock();
use std::io::{BufRead, Write};
loop {
let mut line = String::new();
match reader.read_line(&mut line) {
Ok(0) | Err(_) => break,
_ => {}
}
let trimmed = line.trim();
if trimmed.is_empty() {
continue;
}
if let Ok(req) = serde_json::from_str::<serde_json::Value>(trimmed) {
if req.get("method").and_then(|m| m.as_str()) == Some("tools/call") {
let tool_name = req
.pointer("/params/name")
.and_then(|v| v.as_str())
.unwrap_or("");
let arguments = req
.pointer("/params/arguments")
.cloned()
.unwrap_or(serde_json::json!({}));
let result = {
let mut s = stdio_session.lock();
if tool_name == "apply_batch" {
s.apply_batch_progress_sink = Some(apply_batch_progress_sink(
stdio_event_tx.clone(),
stdio_event_log.clone(),
"stdio".to_string(),
arguments
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string(),
));
}
let result = dispatch_tool(&mut s, tool_name, &arguments);
s.apply_batch_progress_sink = None;
result
};
let sse_event = SseEvent {
event_type: "tool_result".to_string(),
data: serde_json::json!({
"tool": tool_name,
"source": "stdio",
"agent_id": arguments.get("agent_id").and_then(|v| v.as_str()).unwrap_or("unknown"),
"success": result.is_ok(),
"result_preview": match &result {
Ok(v) => truncate_json(v, 500),
Err(e) => serde_json::json!({"error": e.to_string()}),
},
"timestamp_ms": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0),
}),
};
let _ = stdio_event_tx.send(sse_event.clone());
if let Some(ref log_path) = stdio_event_log {
append_event_to_log(log_path, &sse_event);
}
let resp = match result {
Ok(output) => serde_json::json!({
"jsonrpc": "2.0",
"id": req.get("id").cloned().unwrap_or(serde_json::Value::Null),
"result": { "content": [{ "type": "text", "text": serde_json::to_string(&output).unwrap_or_default() }] }
}),
Err(e) => serde_json::json!({
"jsonrpc": "2.0",
"id": req.get("id").cloned().unwrap_or(serde_json::Value::Null),
"error": { "code": -32603, "message": e.to_string() }
}),
};
let _ = writeln!(
writer,
"{}",
serde_json::to_string(&resp).unwrap_or_default()
);
let _ = writer.flush();
} else if req.get("method").and_then(|m| m.as_str()) == Some("tools/list") {
let schemas = tool_schemas();
let resp = serde_json::json!({
"jsonrpc": "2.0",
"id": req.get("id").cloned().unwrap_or(serde_json::Value::Null),
"result": schemas
});
let _ = writeln!(
writer,
"{}",
serde_json::to_string(&resp).unwrap_or_default()
);
let _ = writer.flush();
}
}
}
});
}
let addr: std::net::SocketAddr = format!("{}:{}", bind, port).parse().unwrap_or_else(|_| {
eprintln!("[m1nd-mcp] Invalid bind address: {}:{}", bind, port);
std::process::exit(1);
});
eprintln!("[m1nd-mcp] HTTP server listening on http://{}", addr);
if auto_open {
let url = format!("http://localhost:{}", port);
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(500)).await;
let _ = open_browser(&url);
});
}
let listener = match tokio::net::TcpListener::bind(addr).await {
Ok(l) => l,
Err(e) => {
eprintln!("[m1nd-mcp] Failed to bind to {}: {}", addr, e);
std::process::exit(1);
}
};
let shutdown_session = session.clone();
axum::serve(listener, router)
.with_graceful_shutdown(async move {
let _ = tokio::signal::ctrl_c().await;
eprintln!("[m1nd-mcp] SIGINT received, shutting down...");
let mut s = shutdown_session.lock();
if let Err(e) = s.persist() {
eprintln!("[m1nd-mcp] Failed to persist state on shutdown: {}", e);
}
eprintln!("[m1nd-mcp] State persisted. Goodbye.");
})
.await
.expect("HTTP server failed");
}
fn open_browser(url: &str) -> std::io::Result<()> {
#[cfg(target_os = "macos")]
{
std::process::Command::new("open").arg(url).spawn()?;
}
#[cfg(target_os = "linux")]
{
std::process::Command::new("xdg-open").arg(url).spawn()?;
}
#[cfg(target_os = "windows")]
{
std::process::Command::new("cmd")
.args(["/C", "start", url])
.spawn()?;
}
Ok(())
}
fn truncate_json(value: &serde_json::Value, max_chars: usize) -> serde_json::Value {
let s = serde_json::to_string(value).unwrap_or_default();
if s.len() <= max_chars {
value.clone()
} else {
serde_json::Value::String(format!("{}...(truncated)", &s[..max_chars]))
}
}
fn now_ms() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as u64)
.unwrap_or(0)
}
fn timeout_error_payload(timeout_secs: u64) -> serde_json::Value {
serde_json::json!({
"error_type": "timeout",
"timeout_secs": timeout_secs,
"hint": format!(
"Tool execution exceeded {}s. Try narrowing scope or using incremental mode.",
timeout_secs
),
})
}
fn tool_error_payload(e: &m1nd_core::error::M1ndError) -> serde_json::Value {
serde_json::json!({
"error": "tool_error",
"message": e.to_string(),
})
}
pub fn build_router(state: Arc<AppState>, dev_mode: bool) -> Router {
let api = Router::new()
.route("/api/health", get(handle_health))
.route("/api/tools", get(handle_list_tools))
.route("/api/tools/{*tool_name}", post(handle_tool_call))
.route("/api/graph/stats", get(handle_graph_stats))
.route("/api/graph/subgraph", get(handle_subgraph))
.route("/api/graph/snapshot", get(handle_graph_snapshot))
.route("/api/events", get(handle_sse))
.with_state(state.clone())
.layer(DefaultBodyLimit::max(1_048_576));
if dev_mode {
let ui_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../m1nd-ui/dist");
api.fallback_service(tower_http::services::ServeDir::new(ui_dir))
.layer(CorsLayer::permissive())
} else {
api.fallback(serve_embedded_ui)
.layer(CorsLayer::permissive())
}
}
async fn handle_health(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let state = state.clone();
let result = tokio::task::spawn_blocking(move || {
let session = state.session.lock();
let graph = session.graph.read();
let node_count = graph.num_nodes() as usize;
let edge_count = graph.num_edges();
drop(graph);
serde_json::json!({
"status": if node_count > 0 { "ok" } else { "empty" },
"uptime_secs": session.uptime_seconds(),
"node_count": node_count,
"edge_count": edge_count,
"queries_processed": session.queries_processed,
"agent_sessions": session.session_summary(),
"domain": session.domain.name.as_str(),
"graph_generation": session.graph_generation,
"plasticity_generation": session.plasticity_generation,
})
})
.await
.expect("spawn_blocking panicked");
(StatusCode::OK, Json(result))
}
async fn handle_list_tools(State(state): State<Arc<AppState>>) -> impl IntoResponse {
(
StatusCode::OK,
Json(serde_json::json!({ "tools": state.tool_schemas_cache })),
)
}
async fn handle_tool_call(
State(state): State<Arc<AppState>>,
Path(tool_name): Path<String>,
Json(body): Json<serde_json::Value>,
) -> impl IntoResponse {
let event_tx = state.event_tx.clone();
let event_log_path = state.event_log_path.clone();
let tool_for_event = tool_name.clone();
let agent_id_for_event = body
.get("agent_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown")
.to_string();
let state = state.clone();
let tool = tool_name.clone();
let progress_event_tx = event_tx.clone();
let progress_event_log_path = event_log_path.clone();
let progress_agent_id = agent_id_for_event.clone();
let result = tokio::time::timeout(
Duration::from_secs(TOOL_TIMEOUT_SECS),
tokio::task::spawn_blocking(move || {
let mut session = state.session.lock();
if tool == "apply_batch" {
session.apply_batch_progress_sink = Some(apply_batch_progress_sink(
progress_event_tx.clone(),
progress_event_log_path.clone(),
"http".to_string(),
progress_agent_id.clone(),
));
}
let result = dispatch_tool(&mut session, &tool, &body);
session.apply_batch_progress_sink = None;
result
}),
)
.await;
match result {
Err(_elapsed) => {
let sse_event = SseEvent {
event_type: "tool_timeout".to_string(),
data: serde_json::json!({
"tool": tool_for_event,
"source": "http",
"agent_id": agent_id_for_event,
"timeout_secs": TOOL_TIMEOUT_SECS,
"timestamp_ms": now_ms(),
}),
};
let _ = event_tx.send(sse_event.clone());
if let Some(ref log_path) = event_log_path {
append_event_to_log(log_path, &sse_event);
}
(
StatusCode::GATEWAY_TIMEOUT,
Json(timeout_error_payload(TOOL_TIMEOUT_SECS)),
)
.into_response()
}
Ok(inner) => {
let inner = inner.expect("spawn_blocking panicked");
let sse_event = SseEvent {
event_type: "tool_result".to_string(),
data: serde_json::json!({
"tool": tool_for_event,
"source": "http",
"agent_id": agent_id_for_event,
"success": inner.is_ok(),
"result_preview": match &inner {
Ok(v) => tool_result_summary(&tool_for_event, v),
Err(e) => serde_json::json!({"error": e.to_string()}),
},
"timestamp_ms": now_ms(),
}),
};
let _ = event_tx.send(sse_event.clone());
if let Some(ref log_path) = event_log_path {
append_event_to_log(log_path, &sse_event);
}
if let Ok(output) = &inner {
if tool_for_event == "apply_batch" {
emit_apply_batch_handoff(
&event_tx,
event_log_path.as_ref(),
"http",
&agent_id_for_event,
output,
);
}
}
match inner {
Ok(output) => (
StatusCode::OK,
Json(serde_json::json!({ "result": output })),
)
.into_response(),
Err(e) => {
let (status, error_type) = match &e {
m1nd_core::error::M1ndError::UnknownTool { .. } => {
(StatusCode::NOT_FOUND, "unknown_tool")
}
m1nd_core::error::M1ndError::InvalidParams { .. } => {
(StatusCode::BAD_REQUEST, "invalid_params")
}
m1nd_core::error::M1ndError::Serde(_) => {
(StatusCode::BAD_REQUEST, "invalid_json")
}
_ => (StatusCode::INTERNAL_SERVER_ERROR, "internal"),
};
let mut payload = tool_error_payload(&e);
payload["error"] = serde_json::json!(error_type);
(status, Json(payload)).into_response()
}
}
}
}
}
async fn handle_subgraph(
State(state): State<Arc<AppState>>,
Query(params): Query<SubgraphQuery>,
) -> impl IntoResponse {
let state = state.clone();
let top_k = params.clamped_top_k(); let query = params.query.clone();
let result: serde_json::Value = tokio::task::spawn_blocking(move || {
let start = std::time::Instant::now();
let mut session = state.session.lock();
let activate_params = serde_json::json!({
"query": query,
"agent_id": "gui-subgraph",
"top_k": top_k,
"include_ghost_edges": true,
"include_structural_holes": false,
});
let activate_result = dispatch_tool(&mut session, "m1nd_activate", &activate_params);
match activate_result {
Err(e) => {
serde_json::json!({
"nodes": [],
"edges": [],
"meta": {
"total_nodes": 0,
"rendered_nodes": 0,
"query": query,
"elapsed_ms": start.elapsed().as_millis() as u64,
"error": e.to_string(),
}
})
}
Ok(output) => {
let graph = session.graph.read();
let n = graph.num_nodes() as usize;
let mut node_to_ext: Vec<String> = vec![String::new(); n];
for (interned, &nid) in &graph.id_to_node {
let idx = nid.as_usize();
if idx < n {
node_to_ext[idx] = graph.strings.resolve(*interned).to_string();
}
}
let activated = output
.get("activated")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
let total_nodes = activated.len();
let mut top_node_ids: Vec<m1nd_core::types::NodeId> = Vec::new();
let mut top_ext_ids: std::collections::HashSet<String> =
std::collections::HashSet::new();
let mut subgraph_nodes: Vec<serde_json::Value> = Vec::new();
for node_val in activated.iter().take(top_k) {
let ext_id = node_val
.get("node_id")
.and_then(|v| v.as_str())
.unwrap_or("");
if ext_id.is_empty() {
continue;
}
if let Some(nid) = graph.resolve_id(ext_id) {
let idx = nid.as_usize();
if idx < n {
top_node_ids.push(nid);
top_ext_ids.insert(ext_id.to_string());
let label = graph.strings.resolve(graph.nodes.label[idx]).to_string();
let node_type_val = node_type_to_u8(graph.nodes.node_type[idx]);
let activation = node_val
.get("activation")
.and_then(|v| v.as_f64())
.unwrap_or(0.0) as f32;
let tags: Vec<String> = graph.nodes.tags[idx]
.iter()
.map(|&t| graph.strings.resolve(t).to_string())
.collect();
let provenance = graph.resolve_node_provenance(nid);
let pagerank = if idx < graph.nodes.pagerank.len() {
graph.nodes.pagerank[idx].get()
} else {
0.0
};
subgraph_nodes.push(serde_json::json!({
"id": ext_id,
"label": label,
"node_type": node_type_val,
"activation": activation,
"tags": tags,
"source_path": provenance.source_path,
"pagerank": pagerank,
}));
}
}
}
let mut subgraph_edges: Vec<serde_json::Value> = Vec::new();
for &nid in &top_node_ids {
if !graph.finalized {
continue;
}
let range = graph.csr.out_range(nid);
for j in range {
let tgt = graph.csr.targets[j];
let tgt_idx = tgt.as_usize();
if tgt_idx < n && top_ext_ids.contains(&node_to_ext[tgt_idx]) {
let src_ext = &node_to_ext[nid.as_usize()];
let tgt_ext = &node_to_ext[tgt_idx];
let weight = graph
.csr
.read_weight(m1nd_core::types::EdgeIdx::new(j as u32))
.get();
let relation =
graph.strings.resolve(graph.csr.relations[j]).to_string();
subgraph_edges.push(serde_json::json!({
"source": src_ext,
"target": tgt_ext,
"weight": weight,
"relation": relation,
}));
}
}
}
if let Some(ghost_edges) = output.get("ghost_edges").and_then(|v| v.as_array()) {
for ge in ghost_edges {
let src = ge.get("source").and_then(|v| v.as_str()).unwrap_or("");
let tgt = ge.get("target").and_then(|v| v.as_str()).unwrap_or("");
if top_ext_ids.contains(src) && top_ext_ids.contains(tgt) {
let strength =
ge.get("strength").and_then(|v| v.as_f64()).unwrap_or(0.0);
subgraph_edges.push(serde_json::json!({
"source": src,
"target": tgt,
"weight": strength,
"relation": "ghost",
}));
}
}
}
let rendered = subgraph_nodes.len();
serde_json::json!({
"nodes": subgraph_nodes,
"edges": subgraph_edges,
"meta": {
"total_nodes": total_nodes,
"rendered_nodes": rendered,
"query": query,
"elapsed_ms": start.elapsed().as_millis() as u64,
}
})
}
}
})
.await
.expect("spawn_blocking panicked");
(StatusCode::OK, Json(result))
}
async fn handle_graph_stats(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let state = state.clone();
let result = tokio::task::spawn_blocking(move || {
let session = state.session.lock();
let graph = session.graph.read();
serde_json::json!({
"node_count": graph.num_nodes(),
"edge_count": graph.num_edges(),
"domain": session.domain.name.as_str(),
"namespaces": serde_json::Value::Array(vec![]),
"memory_estimate_bytes": 0_usize,
})
})
.await
.expect("spawn_blocking panicked");
(StatusCode::OK, Json(result))
}
async fn handle_graph_snapshot(State(state): State<Arc<AppState>>) -> impl IntoResponse {
let state = state.clone();
let result: serde_json::Value = tokio::task::spawn_blocking(move || {
let session = state.session.lock();
let graph = session.graph.read();
let n = graph.num_nodes() as usize;
let mut node_to_ext: Vec<String> = vec![String::new(); n];
for (interned, &nid) in &graph.id_to_node {
let idx = nid.as_usize();
if idx < n {
node_to_ext[idx] = graph.strings.resolve(*interned).to_string();
}
}
let mut nodes = Vec::with_capacity(n);
#[allow(clippy::needless_range_loop)]
for i in 0..n {
let label = graph.strings.resolve(graph.nodes.label[i]).to_string();
let tags: Vec<String> = graph.nodes.tags[i]
.iter()
.map(|&t| graph.strings.resolve(t).to_string())
.collect();
let provenance = graph.resolve_node_provenance(m1nd_core::types::NodeId::new(i as u32));
nodes.push(serde_json::json!({
"external_id": node_to_ext[i],
"label": label,
"node_type": node_type_to_u8(graph.nodes.node_type[i]),
"tags": tags,
"last_modified": graph.nodes.last_modified[i],
"change_frequency": graph.nodes.change_frequency[i].get(),
"provenance": {
"source_path": provenance.source_path,
"line_start": provenance.line_start,
"line_end": provenance.line_end,
"namespace": provenance.namespace,
"canonical": provenance.canonical,
},
}));
}
let mut edges = Vec::new();
if graph.finalized {
for src in 0..n {
let range = graph.csr.out_range(m1nd_core::types::NodeId::new(src as u32));
for j in range {
let tgt = graph.csr.targets[j].as_usize();
let dir = graph.csr.directions[j];
if dir == m1nd_core::types::EdgeDirection::Bidirectional && src > tgt {
continue;
}
let relation = graph.strings.resolve(graph.csr.relations[j]).to_string();
let weight = graph.csr.read_weight(m1nd_core::types::EdgeIdx::new(j as u32)).get();
edges.push(serde_json::json!({
"source_id": node_to_ext[src],
"target_id": node_to_ext[tgt],
"relation": relation,
"weight": weight,
"direction": if dir == m1nd_core::types::EdgeDirection::Bidirectional { 1 } else { 0 },
"inhibitory": graph.csr.inhibitory[j],
"causal_strength": graph.csr.causal_strengths[j].get(),
}));
}
}
}
serde_json::json!({
"version": 1,
"nodes": nodes,
"edges": edges,
})
})
.await
.expect("spawn_blocking panicked");
(StatusCode::OK, Json(result))
}
async fn handle_sse(
State(state): State<Arc<AppState>>,
) -> Sse<impl futures::Stream<Item = Result<sse::Event, std::convert::Infallible>>> {
let rx = state.event_tx.subscribe();
let stream = tokio_stream::wrappers::BroadcastStream::new(rx).filter_map(|event| async {
match event {
Ok(e) => {
let sse_event = sse::Event::default()
.event(e.event_type)
.json_data(e.data)
.ok()?;
Some(Ok(sse_event))
}
Err(_) => None,
}
});
Sse::new(stream)
}
fn node_type_to_u8(nt: m1nd_core::types::NodeType) -> u8 {
use m1nd_core::types::NodeType;
match nt {
NodeType::File => 0,
NodeType::Directory => 1,
NodeType::Function => 2,
NodeType::Class => 3,
NodeType::Struct => 4,
NodeType::Enum => 5,
NodeType::Type => 6,
NodeType::Module => 7,
NodeType::Reference => 8,
NodeType::Concept => 9,
NodeType::Material => 10,
NodeType::Process => 11,
NodeType::Product => 12,
NodeType::Supplier => 13,
NodeType::Regulatory => 14,
NodeType::System => 15,
NodeType::Cost => 16,
NodeType::Custom(v) => v,
}
}
async fn serve_embedded_ui(uri: Uri) -> impl IntoResponse {
let path = uri.path().trim_start_matches('/');
let path = if path.is_empty() { "index.html" } else { path };
match UiAssets::get(path) {
Some(content) => {
let mime = mime_guess::from_path(path).first_or_octet_stream();
(
StatusCode::OK,
[(header::CONTENT_TYPE, mime.as_ref().to_string())],
content.data.into_owned(),
)
.into_response()
}
None => {
match UiAssets::get("index.html") {
Some(content) => (
StatusCode::OK,
[(header::CONTENT_TYPE, "text/html".to_string())],
content.data.into_owned(),
)
.into_response(),
None => (StatusCode::NOT_FOUND, "UI not built").into_response(),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn emit_followup_events_replays_apply_batch_progress() {
let (tx, mut rx) = broadcast::channel::<SseEvent>(16);
let output = serde_json::json!({
"batch_id": "batch-1",
"proof_state": "proving",
"next_suggested_tool": "heuristics_surface",
"next_suggested_target": "src/core.py",
"next_step_hint": "Inspect the hotspot before promotion.",
"progress_events": [
{
"batch_id": "batch-1",
"event_type": "phase_completed",
"phase": "validate",
"phase_index": 0,
"progress_pct": 20.0
},
{
"batch_id": "batch-1",
"event_type": "batch_completed",
"phase": "done",
"phase_index": 4,
"progress_pct": 100.0
}
]
});
emit_followup_events(&tx, None, "apply_batch", "http", "tester", &output);
let first = rx.try_recv().expect("first progress event");
let second = rx.try_recv().expect("second progress event");
let third = rx.try_recv().expect("handoff event");
assert_eq!(first.event_type, "apply_batch_progress");
assert_eq!(second.event_type, "apply_batch_progress");
assert_eq!(third.event_type, "apply_batch_handoff");
assert_eq!(first.data["batch_id"].as_str(), Some("batch-1"));
assert_eq!(second.data["batch_id"].as_str(), Some("batch-1"));
assert_eq!(third.data["batch_id"].as_str(), Some("batch-1"));
assert_eq!(
third.data["next_suggested_tool"].as_str(),
Some("heuristics_surface")
);
assert_eq!(first.data["progress"]["phase"].as_str(), Some("validate"));
assert_eq!(second.data["progress"]["phase"].as_str(), Some("done"));
}
#[test]
fn tool_result_summary_compacts_apply_batch_for_sse_consumers() {
let output = serde_json::json!({
"batch_id": "batch-42",
"proof_state": "ready_to_edit",
"active_phase": "done",
"progress_pct": 100.0,
"next_step_hint": "Safe to continue.",
"verification": {"verdict": "SAFE"},
"progress_events": [{}, {}, {}]
});
let summary = tool_result_summary("apply_batch", &output);
assert_eq!(summary["batch_id"], "batch-42");
assert_eq!(summary["proof_state"], "ready_to_edit");
assert_eq!(summary["verification_verdict"], "SAFE");
assert_eq!(summary["progress_event_count"], 3);
}
#[test]
fn emit_apply_batch_handoff_skips_empty_payloads() {
let (tx, mut rx) = broadcast::channel::<SseEvent>(16);
emit_apply_batch_handoff(&tx, None, "http", "tester", &serde_json::json!({}));
assert!(rx.try_recv().is_err());
}
#[test]
fn timeout_payload_teaches_how_to_retry() {
let payload = timeout_error_payload(30);
assert_eq!(payload["error_type"], "timeout");
assert!(payload["hint"].as_str().expect("hint").contains("scope"));
}
}