intent_engine/mcp/
ws_client.rs1use anyhow::{Context, Result};
5use futures_util::{SinkExt, StreamExt};
6use serde::{Deserialize, Serialize};
7use std::path::PathBuf;
8use std::time::Duration;
9use tokio_tungstenite::{connect_async, tungstenite::Message};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct ProjectInfo {
14 pub path: String,
15 pub name: String,
16 pub db_path: String,
17 #[serde(skip_serializing_if = "Option::is_none")]
18 pub agent: Option<String>,
19}
20
21#[derive(Debug, Serialize)]
23#[serde(tag = "type")]
24enum McpMessage {
25 #[serde(rename = "register")]
26 Register { project: ProjectInfo },
27 #[serde(rename = "ping")]
28 Ping,
29}
30
31#[derive(Debug, Deserialize)]
33#[serde(tag = "type")]
34enum DashboardResponse {
35 #[serde(rename = "registered")]
36 Registered { success: bool },
37 #[serde(rename = "pong")]
38 Pong,
39}
40
41pub async fn connect_to_dashboard(
44 project_path: PathBuf,
45 db_path: PathBuf,
46 agent: Option<String>,
47) -> Result<()> {
48 let project_name = project_path
50 .file_name()
51 .and_then(|n| n.to_str())
52 .unwrap_or("unknown")
53 .to_string();
54
55 let normalized_project_path = project_path
57 .canonicalize()
58 .unwrap_or_else(|_| project_path.clone());
59 let normalized_db_path = db_path.canonicalize().unwrap_or_else(|_| db_path.clone());
60
61 if normalized_project_path.starts_with("/tmp")
64 || normalized_project_path.starts_with(std::env::temp_dir())
65 {
66 tracing::warn!(
67 "Skipping Dashboard registration for temporary path: {}",
68 normalized_project_path.display()
69 );
70 return Ok(()); }
72
73 let project_info = ProjectInfo {
75 path: normalized_project_path.to_string_lossy().to_string(),
76 name: project_name,
77 db_path: normalized_db_path.to_string_lossy().to_string(),
78 agent,
79 };
80
81 let url = "ws://127.0.0.1:11391/ws/mcp";
83 let (ws_stream, _) = connect_async(url)
84 .await
85 .context("Failed to connect to Dashboard WebSocket")?;
86
87 tracing::debug!("Connected to Dashboard at {}", url);
88
89 let (mut write, mut read) = ws_stream.split();
90
91 let register_msg = McpMessage::Register {
93 project: project_info.clone(),
94 };
95 let register_json = serde_json::to_string(®ister_msg)?;
96 write
97 .send(Message::Text(register_json))
98 .await
99 .context("Failed to send register message")?;
100
101 if let Some(Ok(Message::Text(text))) = read.next().await {
103 match serde_json::from_str::<DashboardResponse>(&text) {
104 Ok(DashboardResponse::Registered { success: true }) => {
105 tracing::debug!("Successfully registered with Dashboard");
106 },
107 Ok(DashboardResponse::Registered { success: false }) => {
108 anyhow::bail!("Dashboard rejected registration");
109 },
110 _ => {
111 tracing::debug!("Unexpected response during registration: {}", text);
112 },
113 }
114 }
115
116 let mut write_clone = write;
118 tokio::spawn(async move {
119 let mut interval = tokio::time::interval(Duration::from_secs(30));
120
121 loop {
122 interval.tick().await;
123
124 let ping_msg = McpMessage::Ping;
125 if let Ok(ping_json) = serde_json::to_string(&ping_msg) {
126 if write_clone.send(Message::Text(ping_json)).await.is_err() {
127 tracing::warn!("Failed to send ping - Dashboard connection lost");
128 break;
129 }
130 }
131 }
132 });
133
134 tokio::spawn(async move {
136 while let Some(Ok(msg)) = read.next().await {
137 match msg {
138 Message::Text(text) => match serde_json::from_str::<DashboardResponse>(&text) {
139 Ok(DashboardResponse::Pong) => {
140 tracing::debug!("Received pong from Dashboard");
141 },
142 _ => {
143 tracing::debug!("Received message from Dashboard: {}", text);
144 },
145 },
146 Message::Close(_) => {
147 tracing::info!("Dashboard closed connection");
148 break;
149 },
150 _ => {},
151 }
152 }
153 });
154
155 Ok(())
156}