intent_engine/mcp/
ws_client.rs1use anyhow::{Context, Result};
5use futures_util::{SinkExt, StreamExt};
6use serde::{Deserialize, Serialize};
7use std::path::PathBuf;
8use std::time::Duration;
9use tokio_tungstenite::{connect_async, tungstenite::Message};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct ProjectInfo {
14 pub path: String,
15 pub name: String,
16 pub db_path: String,
17 #[serde(skip_serializing_if = "Option::is_none")]
18 pub agent: Option<String>,
19}
20
21#[derive(Debug, Serialize)]
23#[serde(tag = "type")]
24enum McpMessage {
25 #[serde(rename = "register")]
26 Register { project: ProjectInfo },
27 #[serde(rename = "ping")]
28 Ping,
29}
30
31#[derive(Debug, Deserialize)]
33#[serde(tag = "type")]
34enum DashboardResponse {
35 #[serde(rename = "registered")]
36 Registered { success: bool },
37 #[serde(rename = "pong")]
38 Pong,
39}
40
41pub async fn connect_to_dashboard(
44 project_path: PathBuf,
45 db_path: PathBuf,
46 agent: Option<String>,
47) -> Result<()> {
48 let project_name = project_path
50 .file_name()
51 .and_then(|n| n.to_str())
52 .unwrap_or("unknown")
53 .to_string();
54
55 let normalized_project_path = project_path
57 .canonicalize()
58 .unwrap_or_else(|_| project_path.clone());
59 let normalized_db_path = db_path.canonicalize().unwrap_or_else(|_| db_path.clone());
60
61 let temp_dir = std::env::temp_dir()
65 .canonicalize()
66 .unwrap_or_else(|_| std::env::temp_dir());
67 if normalized_project_path.starts_with(&temp_dir) {
68 tracing::warn!(
69 "Skipping Dashboard registration for temporary path: {}",
70 normalized_project_path.display()
71 );
72 return Ok(()); }
74
75 let project_info = ProjectInfo {
77 path: normalized_project_path.to_string_lossy().to_string(),
78 name: project_name,
79 db_path: normalized_db_path.to_string_lossy().to_string(),
80 agent,
81 };
82
83 let url = "ws://127.0.0.1:11391/ws/mcp";
85 let (ws_stream, _) = connect_async(url)
86 .await
87 .context("Failed to connect to Dashboard WebSocket")?;
88
89 tracing::debug!("Connected to Dashboard at {}", url);
90
91 let (mut write, mut read) = ws_stream.split();
92
93 let register_msg = McpMessage::Register {
95 project: project_info.clone(),
96 };
97 let register_json = serde_json::to_string(®ister_msg)?;
98 write
99 .send(Message::Text(register_json))
100 .await
101 .context("Failed to send register message")?;
102
103 if let Some(Ok(Message::Text(text))) = read.next().await {
105 match serde_json::from_str::<DashboardResponse>(&text) {
106 Ok(DashboardResponse::Registered { success: true }) => {
107 tracing::debug!("Successfully registered with Dashboard");
108 },
109 Ok(DashboardResponse::Registered { success: false }) => {
110 anyhow::bail!("Dashboard rejected registration");
111 },
112 _ => {
113 tracing::debug!("Unexpected response during registration: {}", text);
114 },
115 }
116 }
117
118 let mut write_clone = write;
120 tokio::spawn(async move {
121 let mut interval = tokio::time::interval(Duration::from_secs(30));
122
123 loop {
124 interval.tick().await;
125
126 let ping_msg = McpMessage::Ping;
127 if let Ok(ping_json) = serde_json::to_string(&ping_msg) {
128 if write_clone.send(Message::Text(ping_json)).await.is_err() {
129 tracing::warn!("Failed to send ping - Dashboard connection lost");
130 break;
131 }
132 }
133 }
134 });
135
136 tokio::spawn(async move {
138 while let Some(Ok(msg)) = read.next().await {
139 match msg {
140 Message::Text(text) => match serde_json::from_str::<DashboardResponse>(&text) {
141 Ok(DashboardResponse::Pong) => {
142 tracing::debug!("Received pong from Dashboard");
143 },
144 _ => {
145 tracing::debug!("Received message from Dashboard: {}", text);
146 },
147 },
148 Message::Close(_) => {
149 tracing::info!("Dashboard closed connection");
150 break;
151 },
152 _ => {},
153 }
154 }
155 });
156
157 Ok(())
158}