intent_engine/mcp/
ws_client.rs

1// WebSocket client for MCP → Dashboard communication
2// Handles registration and keep-alive for MCP server instances
3
4use anyhow::{Context, Result};
5use futures_util::{SinkExt, StreamExt};
6use serde::{Deserialize, Serialize};
7use std::path::PathBuf;
8use std::time::Duration;
9use tokio_tungstenite::{connect_async, tungstenite::Message};
10
11/// Project information sent to Dashboard
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct ProjectInfo {
14    pub path: String,
15    pub name: String,
16    pub db_path: String,
17    #[serde(skip_serializing_if = "Option::is_none")]
18    pub agent: Option<String>,
19}
20
21/// Message types sent by MCP client
22#[derive(Debug, Serialize)]
23#[serde(tag = "type")]
24enum McpMessage {
25    #[serde(rename = "register")]
26    Register { project: ProjectInfo },
27    #[serde(rename = "ping")]
28    Ping,
29}
30
31/// Response types from Dashboard
32#[derive(Debug, Deserialize)]
33#[serde(tag = "type")]
34enum DashboardResponse {
35    #[serde(rename = "registered")]
36    Registered { success: bool },
37    #[serde(rename = "pong")]
38    Pong,
39}
40
41/// Start WebSocket client connection to Dashboard
42/// This replaces the Registry-based registration mechanism
43pub async fn connect_to_dashboard(
44    project_path: PathBuf,
45    db_path: PathBuf,
46    agent: Option<String>,
47) -> Result<()> {
48    // Extract project name from path
49    let project_name = project_path
50        .file_name()
51        .and_then(|n| n.to_str())
52        .unwrap_or("unknown")
53        .to_string();
54
55    // Normalize paths to handle symlinks
56    let normalized_project_path = project_path
57        .canonicalize()
58        .unwrap_or_else(|_| project_path.clone());
59    let normalized_db_path = db_path.canonicalize().unwrap_or_else(|_| db_path.clone());
60
61    // Validate project path - reject temporary directories
62    // This prevents test environments from polluting the Dashboard registry
63    if normalized_project_path.starts_with("/tmp")
64        || normalized_project_path.starts_with(std::env::temp_dir())
65    {
66        tracing::warn!(
67            "Skipping Dashboard registration for temporary path: {}",
68            normalized_project_path.display()
69        );
70        return Ok(()); // Silently skip, don't error - non-fatal for MCP server
71    }
72
73    // Create project info
74    let project_info = ProjectInfo {
75        path: normalized_project_path.to_string_lossy().to_string(),
76        name: project_name,
77        db_path: normalized_db_path.to_string_lossy().to_string(),
78        agent,
79    };
80
81    // Connect to Dashboard WebSocket
82    let url = "ws://127.0.0.1:11391/ws/mcp";
83    let (ws_stream, _) = connect_async(url)
84        .await
85        .context("Failed to connect to Dashboard WebSocket")?;
86
87    tracing::debug!("Connected to Dashboard at {}", url);
88
89    let (mut write, mut read) = ws_stream.split();
90
91    // Send registration message
92    let register_msg = McpMessage::Register {
93        project: project_info.clone(),
94    };
95    let register_json = serde_json::to_string(&register_msg)?;
96    write
97        .send(Message::Text(register_json))
98        .await
99        .context("Failed to send register message")?;
100
101    // Wait for registration confirmation
102    if let Some(Ok(Message::Text(text))) = read.next().await {
103        match serde_json::from_str::<DashboardResponse>(&text) {
104            Ok(DashboardResponse::Registered { success: true }) => {
105                tracing::debug!("Successfully registered with Dashboard");
106            },
107            Ok(DashboardResponse::Registered { success: false }) => {
108                anyhow::bail!("Dashboard rejected registration");
109            },
110            _ => {
111                tracing::debug!("Unexpected response during registration: {}", text);
112            },
113        }
114    }
115
116    // Spawn ping task
117    let mut write_clone = write;
118    tokio::spawn(async move {
119        let mut interval = tokio::time::interval(Duration::from_secs(30));
120
121        loop {
122            interval.tick().await;
123
124            let ping_msg = McpMessage::Ping;
125            if let Ok(ping_json) = serde_json::to_string(&ping_msg) {
126                if write_clone.send(Message::Text(ping_json)).await.is_err() {
127                    tracing::warn!("Failed to send ping - Dashboard connection lost");
128                    break;
129                }
130            }
131        }
132    });
133
134    // Spawn read task to handle pongs and other messages
135    tokio::spawn(async move {
136        while let Some(Ok(msg)) = read.next().await {
137            match msg {
138                Message::Text(text) => match serde_json::from_str::<DashboardResponse>(&text) {
139                    Ok(DashboardResponse::Pong) => {
140                        tracing::debug!("Received pong from Dashboard");
141                    },
142                    _ => {
143                        tracing::debug!("Received message from Dashboard: {}", text);
144                    },
145                },
146                Message::Close(_) => {
147                    tracing::info!("Dashboard closed connection");
148                    break;
149                },
150                _ => {},
151            }
152        }
153    });
154
155    Ok(())
156}