1#[cfg(all(unix, feature = "unix-sockets"))]
8use crate::mcp::{
9 create_sse_server_config, create_stdio_server_config, McpConfig, McpServerType, SdkMcpManager,
10};
11#[cfg(all(unix, feature = "unix-sockets"))]
12use anyhow::anyhow;
13use anyhow::Result;
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::path::PathBuf;
17#[cfg(all(unix, feature = "unix-sockets"))]
18use tokio::io::{AsyncReadExt, AsyncWriteExt};
19#[cfg(all(unix, feature = "unix-sockets"))]
20use tokio::net::{UnixListener, UnixStream};
21
22#[derive(Debug, Serialize, Deserialize)]
23pub enum DaemonRequest {
24 ListTools {
25 server_name: String,
26 },
27 CallTool {
28 server_name: String,
29 tool_name: String,
30 arguments: serde_json::Value,
31 },
32 EnsureServerConnected {
33 server_name: String,
34 },
35 CloseServer {
36 server_name: String,
37 },
38 ListConnectedServers,
39 Shutdown,
40}
41
42#[derive(Debug, Serialize, Deserialize)]
43pub enum DaemonResponse {
44 Tools(HashMap<String, Vec<rmcp::model::Tool>>),
45 ToolResult(serde_json::Value),
46 ServerConnected,
47 ServerClosed,
48 ConnectedServers(Vec<String>),
49 Success,
50 Error(String),
51}
52
53#[cfg(all(unix, feature = "unix-sockets"))]
54pub struct McpDaemon {
55 manager: SdkMcpManager,
56 socket_path: PathBuf,
57}
58
59#[cfg(all(unix, not(feature = "unix-sockets")))]
61pub struct McpDaemon {
62 _phantom: std::marker::PhantomData<()>,
63}
64
65#[cfg(windows)]
66pub struct McpDaemon {
67 _phantom: std::marker::PhantomData<()>,
68}
69
70#[cfg(all(unix, not(feature = "unix-sockets")))]
71impl McpDaemon {
72 pub fn new() -> Result<Self> {
76 Err(anyhow::anyhow!(
77 "MCP daemon functionality requires the 'unix-sockets' feature to be enabled. \
78 Enable it in Cargo.toml or use direct MCP connections without the daemon."
79 ))
80 }
81
82 pub fn get_socket_path() -> Result<PathBuf> {
86 Err(anyhow::anyhow!(
87 "Unix socket functionality requires the 'unix-sockets' feature to be enabled"
88 ))
89 }
90
91 pub async fn start(&mut self) -> Result<()> {
95 Err(anyhow::anyhow!(
96 "MCP daemon requires the 'unix-sockets' feature to be enabled"
97 ))
98 }
99}
100
101#[cfg(windows)]
102impl McpDaemon {
103 #[allow(dead_code)]
108 pub fn new() -> Result<Self> {
109 Err(anyhow::anyhow!(
110 "MCP daemon functionality is not supported on Windows. \
111 The daemon requires Unix domain sockets which are not available on Windows. \
112 Consider using direct MCP connections without the daemon."
113 ))
114 }
115
116 #[allow(dead_code)]
120 pub fn get_socket_path() -> Result<PathBuf> {
121 Err(anyhow::anyhow!(
122 "Unix socket paths are not supported on Windows"
123 ))
124 }
125
126 #[allow(dead_code)]
130 pub async fn start(&mut self) -> Result<()> {
131 Err(anyhow::anyhow!(
132 "MCP daemon cannot be started on Windows due to lack of Unix socket support"
133 ))
134 }
135}
136
137#[cfg(all(unix, feature = "unix-sockets"))]
138impl McpDaemon {
139 pub fn new() -> Result<Self> {
140 let socket_path = Self::get_socket_path()?;
141 Ok(Self {
142 manager: SdkMcpManager::new(),
143 socket_path,
144 })
145 }
146
147 pub fn get_socket_path() -> Result<PathBuf> {
148 let config_dir = crate::config::Config::config_dir()?;
149 Ok(config_dir.join("mcp_daemon.sock"))
150 }
151
152 pub async fn start(&mut self) -> Result<()> {
153 if self.socket_path.exists() {
155 tokio::fs::remove_file(&self.socket_path).await?;
156 }
157
158 let listener = UnixListener::bind(&self.socket_path)?;
159 crate::debug_log!("MCP Daemon started, listening on {:?}", self.socket_path);
160
161 loop {
162 match listener.accept().await {
163 Ok((stream, _)) => {
164 if let Err(e) = self.handle_client(stream).await {
166 crate::debug_log!("Error handling client: {}", e);
167 }
168 }
169 Err(e) => {
170 crate::debug_log!("Error accepting connection: {}", e);
171 }
172 }
173 }
174 }
175
176 async fn handle_client(&mut self, mut stream: UnixStream) -> Result<()> {
177 let mut buffer = vec![0; 32768];
179
180 let n = tokio::time::timeout(std::time::Duration::from_secs(30), stream.read(&mut buffer))
182 .await??;
183
184 if n == 0 {
185 return Ok(());
186 }
187
188 let request_data = buffer[..n].to_vec();
190 let request: DaemonRequest =
191 tokio::task::spawn_blocking(move || serde_json::from_slice(&request_data)).await??;
192
193 crate::debug_log!("Daemon received request: {:?}", request);
194
195 let response = self.process_request(request).await;
196
197 let response_data =
199 tokio::task::spawn_blocking(move || serde_json::to_vec(&response)).await??;
200
201 let response_len = response_data.len() as u32;
203 tokio::time::timeout(std::time::Duration::from_secs(30), async {
204 stream.write_all(&response_len.to_le_bytes()).await?;
205 stream.write_all(&response_data).await?;
206 stream.flush().await
207 })
208 .await??;
209
210 Ok(())
211 }
212
213 async fn process_request(&mut self, request: DaemonRequest) -> DaemonResponse {
214 match request {
215 DaemonRequest::EnsureServerConnected { server_name } => {
216 match self.ensure_server_connected(&server_name).await {
217 Ok(_) => DaemonResponse::ServerConnected,
218 Err(e) => DaemonResponse::Error(e.to_string()),
219 }
220 }
221 DaemonRequest::ListTools { server_name } => {
222 if let Err(e) = self.ensure_server_connected(&server_name).await {
224 return DaemonResponse::Error(format!(
225 "Failed to connect to server '{}': {}",
226 server_name, e
227 ));
228 }
229
230 match self.manager.list_all_tools().await {
231 Ok(tools) => {
232 if let Some(server_tools) = tools.get(&server_name) {
233 let mut result = HashMap::new();
234 result.insert(server_name, server_tools.clone());
235 DaemonResponse::Tools(result)
236 } else {
237 DaemonResponse::Tools(HashMap::new())
238 }
239 }
240 Err(e) => DaemonResponse::Error(e.to_string()),
241 }
242 }
243 DaemonRequest::CallTool {
244 server_name,
245 tool_name,
246 arguments,
247 } => {
248 match self
249 .manager
250 .call_tool(&server_name, &tool_name, arguments)
251 .await
252 {
253 Ok(result) => DaemonResponse::ToolResult(result),
254 Err(e) => DaemonResponse::Error(e.to_string()),
255 }
256 }
257 DaemonRequest::CloseServer { server_name } => {
258 if let Some(client) = self.manager.clients.remove(&server_name) {
260 let _ = client.cancel().await;
261 crate::debug_log!("Daemon closed connection to MCP server '{}'", server_name);
262 DaemonResponse::ServerClosed
263 } else {
264 DaemonResponse::Error(format!("Server '{}' not found", server_name))
265 }
266 }
267 DaemonRequest::ListConnectedServers => {
268 let servers: Vec<String> = self.manager.clients.keys().cloned().collect();
269 DaemonResponse::ConnectedServers(servers)
270 }
271 DaemonRequest::Shutdown => {
272 crate::debug_log!("Daemon shutdown requested");
273 std::process::exit(0);
274 }
275 }
276 }
277
278 async fn ensure_server_connected(&mut self, server_name: &str) -> Result<()> {
279 if self.manager.clients.contains_key(server_name) {
281 crate::debug_log!(
282 "DAEMON: MCP server '{}' already connected. Total connections: {}",
283 server_name,
284 self.manager.clients.len()
285 );
286 return Ok(());
287 }
288
289 crate::debug_log!(
290 "DAEMON: Loading MCP configuration for server '{}'",
291 server_name
292 );
293
294 let config = McpConfig::load().await?;
296 if let Some(server_config) = config.get_server(server_name) {
297 crate::debug_log!(
298 "DAEMON: Found server config for '{}': {:?} ({})",
299 server_name,
300 server_config.server_type,
301 server_config.command_or_url
302 );
303
304 let sdk_config = match server_config.server_type {
305 McpServerType::Stdio => {
306 let parts: Vec<String> = server_config
307 .command_or_url
308 .split_whitespace()
309 .map(|s| s.to_string())
310 .collect();
311 crate::debug_log!(
312 "DAEMON: Creating STDIO config with command parts: {:?}",
313 parts
314 );
315 let env = if server_config.env.is_empty() {
316 crate::debug_log!("DAEMON: No environment variables to add");
317 None
318 } else {
319 crate::debug_log!(
320 "DAEMON: Adding {} environment variables",
321 server_config.env.len()
322 );
323 for (key, value) in &server_config.env {
324 crate::debug_log!("DAEMON: Env var: {}={}", key, value);
325 }
326 Some(server_config.env.clone())
327 };
328 create_stdio_server_config(server_name.to_string(), parts, env, None)
329 }
330 McpServerType::Sse => {
331 crate::debug_log!(
332 "DAEMON: Creating SSE config with URL: {}",
333 server_config.command_or_url
334 );
335 create_sse_server_config(
336 server_name.to_string(),
337 server_config.command_or_url.clone(),
338 )
339 }
340 McpServerType::Streamable => {
341 crate::debug_log!(
342 "DAEMON: Creating Streamable config (treating as SSE) with URL: {}",
343 server_config.command_or_url
344 );
345 create_sse_server_config(
347 server_name.to_string(),
348 server_config.command_or_url.clone(),
349 )
350 }
351 };
352
353 crate::debug_log!(
354 "DAEMON: Attempting to connect to MCP server '{}'",
355 server_name
356 );
357 match self.manager.add_server(sdk_config).await {
358 Ok(_) => {
359 crate::debug_log!(
360 "DAEMON: Successfully connected to MCP server '{}'. Total connections: {}",
361 server_name,
362 self.manager.clients.len()
363 );
364 Ok(())
365 }
366 Err(e) => {
367 crate::debug_log!(
368 "DAEMON: Failed to connect to MCP server '{}': {}",
369 server_name,
370 e
371 );
372 Err(e)
373 }
374 }
375 } else {
376 crate::debug_log!(
377 "DAEMON: Server '{}' not found in configuration",
378 server_name
379 );
380 Err(anyhow!(
381 "MCP server '{}' not found in configuration",
382 server_name
383 ))
384 }
385 }
386}
387
388#[cfg(all(unix, feature = "unix-sockets"))]
390pub struct DaemonClient {
391 socket_path: PathBuf,
392}
393
394#[cfg(all(unix, not(feature = "unix-sockets")))]
396pub struct DaemonClient {
397 _phantom: std::marker::PhantomData<()>,
398}
399
400#[cfg(windows)]
401pub struct DaemonClient {
402 _phantom: std::marker::PhantomData<()>,
403}
404
405#[cfg(all(unix, not(feature = "unix-sockets")))]
406impl DaemonClient {
407 pub fn new() -> Result<Self> {
411 Err(anyhow::anyhow!(
412 "MCP daemon client requires the 'unix-sockets' feature to be enabled"
413 ))
414 }
415
416 pub async fn is_daemon_running(&self) -> bool {
420 false
421 }
422
423 pub async fn start_daemon_if_needed(&self) -> Result<()> {
427 Err(anyhow::anyhow!(
428 "Cannot start MCP daemon - 'unix-sockets' feature is required"
429 ))
430 }
431
432 pub async fn send_request(&self, _request: DaemonRequest) -> Result<DaemonResponse> {
436 Err(anyhow::anyhow!(
437 "Cannot communicate with MCP daemon - 'unix-sockets' feature is required"
438 ))
439 }
440
441 pub async fn ensure_server_connected(&self, _server_name: &str) -> Result<()> {
445 Err(anyhow::anyhow!(
446 "MCP daemon server connections require the 'unix-sockets' feature"
447 ))
448 }
449
450 pub async fn call_tool(
454 &self,
455 _server_name: &str,
456 _tool_name: &str,
457 _arguments: serde_json::Value,
458 ) -> Result<serde_json::Value> {
459 Err(anyhow::anyhow!(
460 "MCP daemon tool calls require the 'unix-sockets' feature"
461 ))
462 }
463
464 pub async fn list_tools(
468 &self,
469 _server_name: &str,
470 ) -> Result<HashMap<String, Vec<rmcp::model::Tool>>> {
471 Err(anyhow::anyhow!(
472 "MCP daemon tool listing requires the 'unix-sockets' feature"
473 ))
474 }
475
476 pub async fn close_server(&self, _server_name: &str) -> Result<()> {
480 Err(anyhow::anyhow!(
481 "MCP daemon server closing requires the 'unix-sockets' feature"
482 ))
483 }
484}
485
486#[cfg(windows)]
487impl DaemonClient {
488 pub fn new() -> Result<Self> {
492 Err(anyhow::anyhow!(
493 "MCP daemon client is not supported on Windows"
494 ))
495 }
496
497 #[allow(dead_code)]
501 pub async fn is_daemon_running(&self) -> bool {
502 false
503 }
504
505 #[allow(dead_code)]
509 pub async fn start_daemon_if_needed(&self) -> Result<()> {
510 Err(anyhow::anyhow!(
511 "Cannot start MCP daemon on Windows - Unix sockets not supported"
512 ))
513 }
514
515 #[allow(dead_code)]
519 pub async fn send_request(&self, _request: DaemonRequest) -> Result<DaemonResponse> {
520 Err(anyhow::anyhow!(
521 "Cannot communicate with MCP daemon on Windows"
522 ))
523 }
524
525 pub async fn ensure_server_connected(&self, _server_name: &str) -> Result<()> {
529 Err(anyhow::anyhow!(
530 "MCP daemon server connections not supported on Windows"
531 ))
532 }
533
534 pub async fn call_tool(
538 &self,
539 _server_name: &str,
540 _tool_name: &str,
541 _arguments: serde_json::Value,
542 ) -> Result<serde_json::Value> {
543 Err(anyhow::anyhow!(
544 "MCP daemon tool calls not supported on Windows"
545 ))
546 }
547
548 pub async fn list_tools(
552 &self,
553 _server_name: &str,
554 ) -> Result<HashMap<String, Vec<rmcp::model::Tool>>> {
555 Err(anyhow::anyhow!(
556 "MCP daemon tool listing not supported on Windows"
557 ))
558 }
559
560 pub async fn close_server(&self, _server_name: &str) -> Result<()> {
564 Err(anyhow::anyhow!(
565 "MCP daemon server closing not supported on Windows"
566 ))
567 }
568}
569
570#[cfg(all(unix, feature = "unix-sockets"))]
571impl DaemonClient {
572 pub fn new() -> Result<Self> {
573 Ok(Self {
574 socket_path: McpDaemon::get_socket_path()?,
575 })
576 }
577
578 pub async fn is_daemon_running(&self) -> bool {
579 self.socket_path.exists()
580 && self
581 .send_request(DaemonRequest::ListConnectedServers)
582 .await
583 .is_ok()
584 }
585
586 pub async fn start_daemon_if_needed(&self) -> Result<()> {
587 if !self.is_daemon_running().await {
588 crate::debug_log!("Starting MCP daemon...");
589
590 let daemon_binary = std::env::current_exe()?;
592 tokio::process::Command::new(daemon_binary)
593 .arg("--mcp-daemon")
594 .spawn()?;
595
596 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
598
599 let mut retries = 10;
601 while retries > 0 && !self.is_daemon_running().await {
602 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
603 retries -= 1;
604 }
605
606 if !self.is_daemon_running().await {
607 return Err(anyhow!("Failed to start MCP daemon"));
608 }
609
610 crate::debug_log!("MCP daemon started successfully");
611 }
612 Ok(())
613 }
614
615 pub async fn send_request(&self, request: DaemonRequest) -> Result<DaemonResponse> {
616 let mut stream = UnixStream::connect(&self.socket_path).await?;
617
618 let request_data = serde_json::to_vec(&request)?;
619 stream.write_all(&request_data).await?;
620 stream.flush().await?;
621
622 let mut len_buffer = [0u8; 4];
624 stream.read_exact(&mut len_buffer).await?;
625 let response_len = u32::from_le_bytes(len_buffer) as usize;
626
627 let mut response_buffer = vec![0; response_len];
629 stream.read_exact(&mut response_buffer).await?;
630
631 let response: DaemonResponse = serde_json::from_slice(&response_buffer)?;
632 Ok(response)
633 }
634
635 pub async fn ensure_server_connected(&self, server_name: &str) -> Result<()> {
636 self.start_daemon_if_needed().await?;
637
638 match self
639 .send_request(DaemonRequest::EnsureServerConnected {
640 server_name: server_name.to_string(),
641 })
642 .await?
643 {
644 DaemonResponse::ServerConnected => Ok(()),
645 DaemonResponse::Error(e) => Err(anyhow!(e)),
646 _ => Err(anyhow!("Unexpected response from daemon")),
647 }
648 }
649
650 pub async fn call_tool(
651 &self,
652 server_name: &str,
653 tool_name: &str,
654 arguments: serde_json::Value,
655 ) -> Result<serde_json::Value> {
656 match self
657 .send_request(DaemonRequest::CallTool {
658 server_name: server_name.to_string(),
659 tool_name: tool_name.to_string(),
660 arguments,
661 })
662 .await?
663 {
664 DaemonResponse::ToolResult(result) => Ok(result),
665 DaemonResponse::Error(e) => Err(anyhow!(e)),
666 _ => Err(anyhow!("Unexpected response from daemon")),
667 }
668 }
669
670 pub async fn list_tools(
671 &self,
672 server_name: &str,
673 ) -> Result<HashMap<String, Vec<rmcp::model::Tool>>> {
674 crate::debug_log!(
675 "DaemonClient: Requesting tools for server '{}'",
676 server_name
677 );
678 match self
679 .send_request(DaemonRequest::ListTools {
680 server_name: server_name.to_string(),
681 })
682 .await?
683 {
684 DaemonResponse::Tools(tools) => {
685 crate::debug_log!(
686 "DaemonClient: Received tools response with {} servers",
687 tools.len()
688 );
689 for (name, server_tools) in &tools {
690 crate::debug_log!(
691 "DaemonClient: Server '{}' has {} tools",
692 name,
693 server_tools.len()
694 );
695 }
696 Ok(tools)
697 }
698 DaemonResponse::Error(e) => {
699 crate::debug_log!("DaemonClient: Received error response: {}", e);
700 Err(anyhow!(e))
701 }
702 response => {
703 crate::debug_log!("DaemonClient: Received unexpected response: {:?}", response);
704 Err(anyhow!("Unexpected response from daemon"))
705 }
706 }
707 }
708
709 pub async fn close_server(&self, server_name: &str) -> Result<()> {
710 match self
711 .send_request(DaemonRequest::CloseServer {
712 server_name: server_name.to_string(),
713 })
714 .await?
715 {
716 DaemonResponse::ServerClosed => Ok(()),
717 DaemonResponse::Error(e) => Err(anyhow!(e)),
718 _ => Err(anyhow!("Unexpected response from daemon")),
719 }
720 }
721}