intent_engine/mcp/
ws_client.rs

1// WebSocket client for MCP → Dashboard communication
2// Handles registration and keep-alive for MCP server instances
3
4use anyhow::{Context, Result};
5use futures_util::{SinkExt, StreamExt};
6use serde::{Deserialize, Serialize};
7use std::path::PathBuf;
8use std::time::Duration;
9use tokio_tungstenite::{connect_async, tungstenite::Message};
10
11/// Project information sent to Dashboard
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct ProjectInfo {
14    pub path: String,
15    pub name: String,
16    pub db_path: String,
17    #[serde(skip_serializing_if = "Option::is_none")]
18    pub agent: Option<String>,
19}
20
21/// Message types sent by MCP client
22#[derive(Debug, Serialize)]
23#[serde(tag = "type")]
24enum McpMessage {
25    #[serde(rename = "register")]
26    Register { project: ProjectInfo },
27    #[serde(rename = "ping")]
28    Ping,
29}
30
31/// Response types from Dashboard
32#[derive(Debug, Deserialize)]
33#[serde(tag = "type")]
34enum DashboardResponse {
35    #[serde(rename = "registered")]
36    Registered { success: bool },
37    #[serde(rename = "pong")]
38    Pong,
39}
40
41/// Start WebSocket client connection to Dashboard
42/// This replaces the Registry-based registration mechanism
43pub async fn connect_to_dashboard(
44    project_path: PathBuf,
45    db_path: PathBuf,
46    agent: Option<String>,
47) -> Result<()> {
48    // Extract project name from path
49    let project_name = project_path
50        .file_name()
51        .and_then(|n| n.to_str())
52        .unwrap_or("unknown")
53        .to_string();
54
55    // Normalize paths to handle symlinks
56    let normalized_project_path = project_path
57        .canonicalize()
58        .unwrap_or_else(|_| project_path.clone());
59    let normalized_db_path = db_path.canonicalize().unwrap_or_else(|_| db_path.clone());
60
61    // Validate project path - reject temporary directories (Defense Layer 2)
62    // This prevents test environments from polluting the Dashboard registry
63    // IMPORTANT: Canonicalize temp_dir to match normalized_project_path format (fixes Windows UNC paths)
64    let temp_dir = std::env::temp_dir()
65        .canonicalize()
66        .unwrap_or_else(|_| std::env::temp_dir());
67    if normalized_project_path.starts_with(&temp_dir) {
68        tracing::warn!(
69            "Skipping Dashboard registration for temporary path: {}",
70            normalized_project_path.display()
71        );
72        return Ok(()); // Silently skip, don't error - non-fatal for MCP server
73    }
74
75    // Create project info
76    let project_info = ProjectInfo {
77        path: normalized_project_path.to_string_lossy().to_string(),
78        name: project_name,
79        db_path: normalized_db_path.to_string_lossy().to_string(),
80        agent,
81    };
82
83    // Connect to Dashboard WebSocket
84    let url = "ws://127.0.0.1:11391/ws/mcp";
85    let (ws_stream, _) = connect_async(url)
86        .await
87        .context("Failed to connect to Dashboard WebSocket")?;
88
89    tracing::debug!("Connected to Dashboard at {}", url);
90
91    let (mut write, mut read) = ws_stream.split();
92
93    // Send registration message
94    let register_msg = McpMessage::Register {
95        project: project_info.clone(),
96    };
97    let register_json = serde_json::to_string(&register_msg)?;
98    write
99        .send(Message::Text(register_json))
100        .await
101        .context("Failed to send register message")?;
102
103    // Wait for registration confirmation
104    if let Some(Ok(Message::Text(text))) = read.next().await {
105        match serde_json::from_str::<DashboardResponse>(&text) {
106            Ok(DashboardResponse::Registered { success: true }) => {
107                tracing::debug!("Successfully registered with Dashboard");
108            },
109            Ok(DashboardResponse::Registered { success: false }) => {
110                anyhow::bail!("Dashboard rejected registration");
111            },
112            _ => {
113                tracing::debug!("Unexpected response during registration: {}", text);
114            },
115        }
116    }
117
118    // Spawn ping task
119    let mut write_clone = write;
120    tokio::spawn(async move {
121        let mut interval = tokio::time::interval(Duration::from_secs(30));
122
123        loop {
124            interval.tick().await;
125
126            let ping_msg = McpMessage::Ping;
127            if let Ok(ping_json) = serde_json::to_string(&ping_msg) {
128                if write_clone.send(Message::Text(ping_json)).await.is_err() {
129                    tracing::warn!("Failed to send ping - Dashboard connection lost");
130                    break;
131                }
132            }
133        }
134    });
135
136    // Spawn read task to handle pongs and other messages
137    tokio::spawn(async move {
138        while let Some(Ok(msg)) = read.next().await {
139            match msg {
140                Message::Text(text) => match serde_json::from_str::<DashboardResponse>(&text) {
141                    Ok(DashboardResponse::Pong) => {
142                        tracing::debug!("Received pong from Dashboard");
143                    },
144                    _ => {
145                        tracing::debug!("Received message from Dashboard: {}", text);
146                    },
147                },
148                Message::Close(_) => {
149                    tracing::info!("Dashboard closed connection");
150                    break;
151                },
152                _ => {},
153            }
154        }
155    });
156
157    Ok(())
158}