1use crate::mcp::{
8 create_sse_server_config, create_stdio_server_config, McpConfig, McpServerType, SdkMcpManager,
9};
10use anyhow::{anyhow, Result};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use tokio::io::{AsyncReadExt, AsyncWriteExt};
15#[cfg(all(unix, feature = "unix-sockets"))]
16use tokio::net::{UnixListener, UnixStream};
17
18#[cfg(windows)]
23use std::io;
24
25#[derive(Debug, Serialize, Deserialize)]
26pub enum DaemonRequest {
27 ListTools {
28 server_name: String,
29 },
30 CallTool {
31 server_name: String,
32 tool_name: String,
33 arguments: serde_json::Value,
34 },
35 EnsureServerConnected {
36 server_name: String,
37 },
38 CloseServer {
39 server_name: String,
40 },
41 ListConnectedServers,
42 Shutdown,
43}
44
45#[derive(Debug, Serialize, Deserialize)]
46pub enum DaemonResponse {
47 Tools(HashMap<String, Vec<rmcp::model::Tool>>),
48 ToolResult(serde_json::Value),
49 ServerConnected,
50 ServerClosed,
51 ConnectedServers(Vec<String>),
52 Success,
53 Error(String),
54}
55
56#[cfg(all(unix, feature = "unix-sockets"))]
57pub struct McpDaemon {
58 manager: SdkMcpManager,
59 socket_path: PathBuf,
60}
61
62#[cfg(windows)]
63pub struct McpDaemon {
64 _phantom: std::marker::PhantomData<()>,
65}
66
67#[cfg(all(unix, not(feature = "unix-sockets")))]
69pub struct McpDaemon {
70 _phantom: std::marker::PhantomData<()>,
71}
72
73#[cfg(all(unix, not(feature = "unix-sockets")))]
74impl McpDaemon {
75 pub fn new() -> Result<Self> {
79 Err(anyhow::anyhow!(
80 "MCP daemon functionality requires the 'unix-sockets' feature to be enabled. \
81 Enable it in Cargo.toml or use direct MCP connections without the daemon."
82 ))
83 }
84
85 pub fn get_socket_path() -> Result<PathBuf> {
89 Err(anyhow::anyhow!(
90 "Unix socket functionality requires the 'unix-sockets' feature to be enabled"
91 ))
92 }
93
94 pub async fn start(&mut self) -> Result<()> {
98 Err(anyhow::anyhow!(
99 "MCP daemon requires the 'unix-sockets' feature to be enabled"
100 ))
101 }
102}
103
104#[cfg(windows)]
105pub struct McpDaemon {
106 _phantom: std::marker::PhantomData<()>,
107}
108
109#[cfg(windows)]
110impl McpDaemon {
111 pub fn new() -> Result<Self> {
116 Err(anyhow::anyhow!(
117 "MCP daemon functionality is not supported on Windows. \
118 The daemon requires Unix domain sockets which are not available on Windows. \
119 Consider using direct MCP connections without the daemon."
120 ))
121 }
122
123 pub fn get_socket_path() -> Result<PathBuf> {
127 Err(anyhow::anyhow!(
128 "Unix socket paths are not supported on Windows"
129 ))
130 }
131
132 pub async fn start(&mut self) -> Result<()> {
136 Err(anyhow::anyhow!(
137 "MCP daemon cannot be started on Windows due to lack of Unix socket support"
138 ))
139 }
140}
141
142#[cfg(all(unix, feature = "unix-sockets"))]
143impl McpDaemon {
144 pub fn new() -> Result<Self> {
145 let socket_path = Self::get_socket_path()?;
146 Ok(Self {
147 manager: SdkMcpManager::new(),
148 socket_path,
149 })
150 }
151
152 pub fn get_socket_path() -> Result<PathBuf> {
153 let config_dir = crate::config::Config::config_dir()?;
154 Ok(config_dir.join("mcp_daemon.sock"))
155 }
156
157 pub async fn start(&mut self) -> Result<()> {
158 if self.socket_path.exists() {
160 std::fs::remove_file(&self.socket_path)?;
161 }
162
163 let listener = UnixListener::bind(&self.socket_path)?;
164 crate::debug_log!("MCP Daemon started, listening on {:?}", self.socket_path);
165
166 loop {
167 match listener.accept().await {
168 Ok((stream, _)) => {
169 if let Err(e) = self.handle_client(stream).await {
171 crate::debug_log!("Error handling client: {}", e);
172 }
173 }
174 Err(e) => {
175 crate::debug_log!("Error accepting connection: {}", e);
176 }
177 }
178 }
179 }
180
181 async fn handle_client(&mut self, mut stream: UnixStream) -> Result<()> {
182 let mut buffer = vec![0; 32768];
184
185 let n = tokio::time::timeout(std::time::Duration::from_secs(30), stream.read(&mut buffer))
187 .await??;
188
189 if n == 0 {
190 return Ok(());
191 }
192
193 let request_data = buffer[..n].to_vec();
195 let request: DaemonRequest =
196 tokio::task::spawn_blocking(move || serde_json::from_slice(&request_data)).await??;
197
198 crate::debug_log!("Daemon received request: {:?}", request);
199
200 let response = self.process_request(request).await;
201
202 let response_data =
204 tokio::task::spawn_blocking(move || serde_json::to_vec(&response)).await??;
205
206 let response_len = response_data.len() as u32;
208 tokio::time::timeout(std::time::Duration::from_secs(30), async {
209 stream.write_all(&response_len.to_le_bytes()).await?;
210 stream.write_all(&response_data).await?;
211 stream.flush().await
212 })
213 .await??;
214
215 Ok(())
216 }
217
218 async fn process_request(&mut self, request: DaemonRequest) -> DaemonResponse {
219 match request {
220 DaemonRequest::EnsureServerConnected { server_name } => {
221 match self.ensure_server_connected(&server_name).await {
222 Ok(_) => DaemonResponse::ServerConnected,
223 Err(e) => DaemonResponse::Error(e.to_string()),
224 }
225 }
226 DaemonRequest::ListTools { server_name } => {
227 if let Err(e) = self.ensure_server_connected(&server_name).await {
229 return DaemonResponse::Error(format!(
230 "Failed to connect to server '{}': {}",
231 server_name, e
232 ));
233 }
234
235 match self.manager.list_all_tools().await {
236 Ok(tools) => {
237 if let Some(server_tools) = tools.get(&server_name) {
238 let mut result = HashMap::new();
239 result.insert(server_name, server_tools.clone());
240 DaemonResponse::Tools(result)
241 } else {
242 DaemonResponse::Tools(HashMap::new())
243 }
244 }
245 Err(e) => DaemonResponse::Error(e.to_string()),
246 }
247 }
248 DaemonRequest::CallTool {
249 server_name,
250 tool_name,
251 arguments,
252 } => {
253 match self
254 .manager
255 .call_tool(&server_name, &tool_name, arguments)
256 .await
257 {
258 Ok(result) => DaemonResponse::ToolResult(result),
259 Err(e) => DaemonResponse::Error(e.to_string()),
260 }
261 }
262 DaemonRequest::CloseServer { server_name } => {
263 if let Some(client) = self.manager.clients.remove(&server_name) {
265 let _ = client.cancel().await;
266 crate::debug_log!("Daemon closed connection to MCP server '{}'", server_name);
267 DaemonResponse::ServerClosed
268 } else {
269 DaemonResponse::Error(format!("Server '{}' not found", server_name))
270 }
271 }
272 DaemonRequest::ListConnectedServers => {
273 let servers: Vec<String> = self.manager.clients.keys().cloned().collect();
274 DaemonResponse::ConnectedServers(servers)
275 }
276 DaemonRequest::Shutdown => {
277 crate::debug_log!("Daemon shutdown requested");
278 std::process::exit(0);
279 }
280 }
281 }
282
283 async fn ensure_server_connected(&mut self, server_name: &str) -> Result<()> {
284 if self.manager.clients.contains_key(server_name) {
286 crate::debug_log!(
287 "DAEMON: MCP server '{}' already connected. Total connections: {}",
288 server_name,
289 self.manager.clients.len()
290 );
291 return Ok(());
292 }
293
294 crate::debug_log!(
295 "DAEMON: Loading MCP configuration for server '{}'",
296 server_name
297 );
298
299 let config = McpConfig::load()?;
301 if let Some(server_config) = config.get_server(server_name) {
302 crate::debug_log!(
303 "DAEMON: Found server config for '{}': {:?} ({})",
304 server_name,
305 server_config.server_type,
306 server_config.command_or_url
307 );
308
309 let sdk_config = match server_config.server_type {
310 McpServerType::Stdio => {
311 let parts: Vec<String> = server_config
312 .command_or_url
313 .split_whitespace()
314 .map(|s| s.to_string())
315 .collect();
316 crate::debug_log!(
317 "DAEMON: Creating STDIO config with command parts: {:?}",
318 parts
319 );
320 let env = if server_config.env.is_empty() {
321 crate::debug_log!("DAEMON: No environment variables to add");
322 None
323 } else {
324 crate::debug_log!(
325 "DAEMON: Adding {} environment variables",
326 server_config.env.len()
327 );
328 for (key, value) in &server_config.env {
329 crate::debug_log!("DAEMON: Env var: {}={}", key, value);
330 }
331 Some(server_config.env.clone())
332 };
333 create_stdio_server_config(server_name.to_string(), parts, env, None)
334 }
335 McpServerType::Sse => {
336 crate::debug_log!(
337 "DAEMON: Creating SSE config with URL: {}",
338 server_config.command_or_url
339 );
340 create_sse_server_config(
341 server_name.to_string(),
342 server_config.command_or_url.clone(),
343 )
344 }
345 McpServerType::Streamable => {
346 crate::debug_log!(
347 "DAEMON: Creating Streamable config (treating as SSE) with URL: {}",
348 server_config.command_or_url
349 );
350 create_sse_server_config(
352 server_name.to_string(),
353 server_config.command_or_url.clone(),
354 )
355 }
356 };
357
358 crate::debug_log!(
359 "DAEMON: Attempting to connect to MCP server '{}'",
360 server_name
361 );
362 match self.manager.add_server(sdk_config).await {
363 Ok(_) => {
364 crate::debug_log!(
365 "DAEMON: Successfully connected to MCP server '{}'. Total connections: {}",
366 server_name,
367 self.manager.clients.len()
368 );
369 Ok(())
370 }
371 Err(e) => {
372 crate::debug_log!(
373 "DAEMON: Failed to connect to MCP server '{}': {}",
374 server_name,
375 e
376 );
377 Err(e)
378 }
379 }
380 } else {
381 crate::debug_log!(
382 "DAEMON: Server '{}' not found in configuration",
383 server_name
384 );
385 Err(anyhow!(
386 "MCP server '{}' not found in configuration",
387 server_name
388 ))
389 }
390 }
391}
392
393#[cfg(all(unix, feature = "unix-sockets"))]
395pub struct DaemonClient {
396 socket_path: PathBuf,
397}
398
399#[cfg(all(unix, not(feature = "unix-sockets")))]
401pub struct DaemonClient {
402 _phantom: std::marker::PhantomData<()>,
403}
404
405#[cfg(windows)]
406pub struct DaemonClient {
407 _phantom: std::marker::PhantomData<()>,
408}
409
410#[cfg(all(unix, not(feature = "unix-sockets")))]
411impl DaemonClient {
412 pub fn new() -> Result<Self> {
416 Err(anyhow::anyhow!(
417 "MCP daemon client requires the 'unix-sockets' feature to be enabled"
418 ))
419 }
420
421 pub async fn is_daemon_running(&self) -> bool {
425 false
426 }
427
428 pub async fn start_daemon_if_needed(&self) -> Result<()> {
432 Err(anyhow::anyhow!(
433 "Cannot start MCP daemon - 'unix-sockets' feature is required"
434 ))
435 }
436
437 pub async fn send_request(&self, _request: DaemonRequest) -> Result<DaemonResponse> {
441 Err(anyhow::anyhow!(
442 "Cannot communicate with MCP daemon - 'unix-sockets' feature is required"
443 ))
444 }
445
446 pub async fn ensure_server_connected(&self, _server_name: &str) -> Result<()> {
450 Err(anyhow::anyhow!(
451 "MCP daemon server connections require the 'unix-sockets' feature"
452 ))
453 }
454
455 pub async fn call_tool(
459 &self,
460 _server_name: &str,
461 _tool_name: &str,
462 _arguments: serde_json::Value,
463 ) -> Result<serde_json::Value> {
464 Err(anyhow::anyhow!(
465 "MCP daemon tool calls require the 'unix-sockets' feature"
466 ))
467 }
468
469 pub async fn list_tools(
473 &self,
474 _server_name: &str,
475 ) -> Result<HashMap<String, Vec<rmcp::model::Tool>>> {
476 Err(anyhow::anyhow!(
477 "MCP daemon tool listing requires the 'unix-sockets' feature"
478 ))
479 }
480
481 pub async fn close_server(&self, _server_name: &str) -> Result<()> {
485 Err(anyhow::anyhow!(
486 "MCP daemon server closing requires the 'unix-sockets' feature"
487 ))
488 }
489}
490
491#[cfg(windows)]
492impl DaemonClient {
493 pub fn new() -> Result<Self> {
497 Err(anyhow::anyhow!(
498 "MCP daemon client is not supported on Windows"
499 ))
500 }
501
502 pub async fn is_daemon_running(&self) -> bool {
506 false
507 }
508
509 pub async fn start_daemon_if_needed(&self) -> Result<()> {
513 Err(anyhow::anyhow!(
514 "Cannot start MCP daemon on Windows - Unix sockets not supported"
515 ))
516 }
517
518 pub async fn send_request(&self, _request: DaemonRequest) -> Result<DaemonResponse> {
522 Err(anyhow::anyhow!(
523 "Cannot communicate with MCP daemon on Windows"
524 ))
525 }
526
527 pub async fn ensure_server_connected(&self, _server_name: &str) -> Result<()> {
531 Err(anyhow::anyhow!(
532 "MCP daemon server connections not supported on Windows"
533 ))
534 }
535
536 pub async fn call_tool(
540 &self,
541 _server_name: &str,
542 _tool_name: &str,
543 _arguments: serde_json::Value,
544 ) -> Result<serde_json::Value> {
545 Err(anyhow::anyhow!(
546 "MCP daemon tool calls not supported on Windows"
547 ))
548 }
549
550 pub async fn list_tools(
554 &self,
555 _server_name: &str,
556 ) -> Result<HashMap<String, Vec<rmcp::model::Tool>>> {
557 Err(anyhow::anyhow!(
558 "MCP daemon tool listing not supported on Windows"
559 ))
560 }
561
562 pub async fn close_server(&self, _server_name: &str) -> Result<()> {
566 Err(anyhow::anyhow!(
567 "MCP daemon server closing not supported on Windows"
568 ))
569 }
570}
571
572#[cfg(all(unix, feature = "unix-sockets"))]
573impl DaemonClient {
574 pub fn new() -> Result<Self> {
575 Ok(Self {
576 socket_path: McpDaemon::get_socket_path()?,
577 })
578 }
579
580 pub async fn is_daemon_running(&self) -> bool {
581 self.socket_path.exists()
582 && self
583 .send_request(DaemonRequest::ListConnectedServers)
584 .await
585 .is_ok()
586 }
587
588 pub async fn start_daemon_if_needed(&self) -> Result<()> {
589 if !self.is_daemon_running().await {
590 crate::debug_log!("Starting MCP daemon...");
591
592 let daemon_binary = std::env::current_exe()?;
594 tokio::process::Command::new(daemon_binary)
595 .arg("--mcp-daemon")
596 .spawn()?;
597
598 tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
600
601 let mut retries = 10;
603 while retries > 0 && !self.is_daemon_running().await {
604 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
605 retries -= 1;
606 }
607
608 if !self.is_daemon_running().await {
609 return Err(anyhow!("Failed to start MCP daemon"));
610 }
611
612 crate::debug_log!("MCP daemon started successfully");
613 }
614 Ok(())
615 }
616
617 pub async fn send_request(&self, request: DaemonRequest) -> Result<DaemonResponse> {
618 let mut stream = UnixStream::connect(&self.socket_path).await?;
619
620 let request_data = serde_json::to_vec(&request)?;
621 stream.write_all(&request_data).await?;
622 stream.flush().await?;
623
624 let mut len_buffer = [0u8; 4];
626 stream.read_exact(&mut len_buffer).await?;
627 let response_len = u32::from_le_bytes(len_buffer) as usize;
628
629 let mut response_buffer = vec![0; response_len];
631 stream.read_exact(&mut response_buffer).await?;
632
633 let response: DaemonResponse = serde_json::from_slice(&response_buffer)?;
634 Ok(response)
635 }
636
637 pub async fn ensure_server_connected(&self, server_name: &str) -> Result<()> {
638 self.start_daemon_if_needed().await?;
639
640 match self
641 .send_request(DaemonRequest::EnsureServerConnected {
642 server_name: server_name.to_string(),
643 })
644 .await?
645 {
646 DaemonResponse::ServerConnected => Ok(()),
647 DaemonResponse::Error(e) => Err(anyhow!(e)),
648 _ => Err(anyhow!("Unexpected response from daemon")),
649 }
650 }
651
652 pub async fn call_tool(
653 &self,
654 server_name: &str,
655 tool_name: &str,
656 arguments: serde_json::Value,
657 ) -> Result<serde_json::Value> {
658 match self
659 .send_request(DaemonRequest::CallTool {
660 server_name: server_name.to_string(),
661 tool_name: tool_name.to_string(),
662 arguments,
663 })
664 .await?
665 {
666 DaemonResponse::ToolResult(result) => Ok(result),
667 DaemonResponse::Error(e) => Err(anyhow!(e)),
668 _ => Err(anyhow!("Unexpected response from daemon")),
669 }
670 }
671
672 pub async fn list_tools(
673 &self,
674 server_name: &str,
675 ) -> Result<HashMap<String, Vec<rmcp::model::Tool>>> {
676 crate::debug_log!(
677 "DaemonClient: Requesting tools for server '{}'",
678 server_name
679 );
680 match self
681 .send_request(DaemonRequest::ListTools {
682 server_name: server_name.to_string(),
683 })
684 .await?
685 {
686 DaemonResponse::Tools(tools) => {
687 crate::debug_log!(
688 "DaemonClient: Received tools response with {} servers",
689 tools.len()
690 );
691 for (name, server_tools) in &tools {
692 crate::debug_log!(
693 "DaemonClient: Server '{}' has {} tools",
694 name,
695 server_tools.len()
696 );
697 }
698 Ok(tools)
699 }
700 DaemonResponse::Error(e) => {
701 crate::debug_log!("DaemonClient: Received error response: {}", e);
702 Err(anyhow!(e))
703 }
704 response => {
705 crate::debug_log!("DaemonClient: Received unexpected response: {:?}", response);
706 Err(anyhow!("Unexpected response from daemon"))
707 }
708 }
709 }
710
711 pub async fn close_server(&self, server_name: &str) -> Result<()> {
712 match self
713 .send_request(DaemonRequest::CloseServer {
714 server_name: server_name.to_string(),
715 })
716 .await?
717 {
718 DaemonResponse::ServerClosed => Ok(()),
719 DaemonResponse::Error(e) => Err(anyhow!(e)),
720 _ => Err(anyhow!("Unexpected response from daemon")),
721 }
722 }
723}