qudag_cli/mcp/
server.rs

1//! MCP server management implementation
2
3use crate::CliError;
4use qudag_mcp::{create_server, McpConfig, QuDAGMCPServer, ServerConfig};
5use std::path::PathBuf;
6use std::process::Stdio;
7use tracing::{error, info, warn};
8
9/// Start MCP server in foreground mode
10pub async fn start_mcp_server_foreground(
11    bind: String,
12    transport: String,
13    config_path: Option<PathBuf>,
14    verbose: bool,
15) -> Result<(), CliError> {
16    info!("Starting MCP server in foreground mode");
17
18    // Load or create MCP configuration
19    let mcp_config = if let Some(path) = config_path {
20        if path.exists() {
21            McpConfig::from_file(&path)
22                .map_err(|e| CliError::Config(format!("Failed to load config: {}", e)))?
23        } else {
24            return Err(CliError::Config(format!(
25                "Config file not found: {:?}",
26                path
27            )));
28        }
29    } else {
30        McpConfig::default()
31    };
32
33    // Parse bind address
34    let parsed_bind = if bind.contains(':') {
35        bind.clone()
36    } else {
37        format!("{}:{}", bind, 3000) // Default port
38    };
39
40    // Create transport configuration based on CLI parameter
41    let transport_config = match transport.as_str() {
42        "stdio" => qudag_mcp::TransportConfig::Stdio,
43        "http" => qudag_mcp::TransportConfig::Http {
44            server_url: format!("http://{}", parsed_bind),
45        },
46        "websocket" | "ws" => qudag_mcp::TransportConfig::WebSocket {
47            url: format!("ws://{}/mcp", parsed_bind),
48        },
49        _ => {
50            return Err(CliError::Config(format!(
51                "Unsupported transport type: {}",
52                transport
53            )));
54        }
55    };
56
57    // Create server configuration
58    let server_config = ServerConfig::new()
59        .with_server_info("QuDAG MCP Server", qudag_mcp::VERSION)
60        .with_transport(transport_config)
61        .with_log_level(if verbose { "debug" } else { "info" });
62
63    // For stdio transport, all output must go to stderr to avoid interfering with JSON-RPC
64    eprintln!("Starting QuDAG MCP Server");
65    eprintln!("=========================");
66    eprintln!("Bind Address: {}", parsed_bind);
67    eprintln!("Transport: {:?}", transport);
68    eprintln!("Server Name: {}", server_config.server_info.name);
69    eprintln!("Version: {}", server_config.server_info.version);
70
71    if verbose {
72        eprintln!("MCP Config Host: {}", mcp_config.server.host);
73        eprintln!("MCP Config Port: {}", mcp_config.server.port);
74        eprintln!(
75            "MCP Config Max Connections: {}",
76            mcp_config.server.max_connections
77        );
78        eprintln!(
79            "MCP Config Request Timeout: {}s",
80            mcp_config.server.request_timeout.as_secs()
81        );
82        eprintln!("MCP Config Auth Vault: {:?}", mcp_config.auth.vault_path);
83        eprintln!("MCP Config MFA Enabled: {}", mcp_config.auth.mfa_enabled);
84    }
85    eprintln!();
86
87    // Create and start server
88    let mut server = QuDAGMCPServer::new(server_config)
89        .await
90        .map_err(|e| CliError::Server(format!("Failed to create MCP server: {}", e)))?;
91
92    eprintln!("āœ“ MCP server initialized successfully");
93    eprintln!("āœ“ QuDAG tools and resources loaded");
94    eprintln!();
95
96    // Setup graceful shutdown
97    eprintln!("āœ“ Starting MCP server main loop");
98    eprintln!("  Press Ctrl+C to stop the server");
99    eprintln!();
100
101    // Setup signal handler
102    let ctrl_c = tokio::signal::ctrl_c();
103
104    tokio::select! {
105        result = server.run() => {
106            match result {
107                Ok(()) => {
108                    eprintln!("āœ“ MCP server stopped gracefully");
109                }
110                Err(e) => {
111                    error!("MCP server error: {}", e);
112                    return Err(CliError::Server(format!("MCP server error: {}", e)));
113                }
114            }
115        }
116        _ = ctrl_c => {
117            eprintln!("\nšŸ›‘ Shutting down MCP server...");
118            if let Err(e) = server.stop().await {
119                warn!("Error during shutdown: {}", e);
120            }
121            eprintln!("āœ“ MCP server stopped");
122        }
123    }
124
125    Ok(())
126}
127
128/// Start MCP server in background mode
129pub async fn start_mcp_server_background(
130    bind: String,
131    transport: String,
132    config_path: Option<PathBuf>,
133    verbose: bool,
134) -> Result<(), CliError> {
135    info!("Starting MCP server in background mode");
136
137    // Get current executable path
138    let current_exe = std::env::current_exe()
139        .map_err(|e| CliError::Config(format!("Failed to get current executable: {}", e)))?;
140
141    // Build command arguments
142    let mut args = vec![
143        "mcp".to_string(),
144        "start".to_string(),
145        "--bind".to_string(),
146        bind,
147        "--transport".to_string(),
148        transport,
149    ];
150
151    if let Some(config) = config_path {
152        args.push("--config".to_string());
153        args.push(config.to_string_lossy().to_string());
154    }
155
156    if verbose {
157        args.push("--verbose".to_string());
158    }
159
160    // Start background process
161    let mut cmd = tokio::process::Command::new(&current_exe);
162    cmd.args(&args)
163        .stdin(Stdio::null())
164        .stdout(Stdio::piped())
165        .stderr(Stdio::piped());
166
167    let child = cmd
168        .spawn()
169        .map_err(|e| CliError::Server(format!("Failed to start background MCP server: {}", e)))?;
170
171    let pid = child.id().unwrap_or(0);
172
173    // Save PID for later management
174    save_mcp_server_pid(pid).await?;
175
176    eprintln!("āœ“ MCP server started in background");
177    eprintln!("  Process ID: {}", pid);
178    eprintln!("  Use 'qudag mcp status' to check server status");
179    eprintln!("  Use 'qudag mcp stop' to stop the server");
180
181    Ok(())
182}
183
184/// Stop MCP server
185pub async fn stop_mcp_server(force: bool) -> Result<(), CliError> {
186    info!("Stopping MCP server");
187
188    // Try to get saved PID
189    match get_mcp_server_pid().await {
190        Ok(Some(pid)) => {
191            eprintln!("Stopping MCP server (PID: {})...", pid);
192
193            // Try graceful shutdown first
194            if !force {
195                if terminate_process(pid, false).await? {
196                    eprintln!("āœ“ MCP server stopped gracefully");
197                    clear_mcp_server_pid().await?;
198                    return Ok(());
199                }
200            }
201
202            // Force kill if graceful shutdown failed or force flag is set
203            if force || !terminate_process(pid, false).await? {
204                warn!("Graceful shutdown failed, force killing process");
205                if terminate_process(pid, true).await? {
206                    eprintln!("āœ“ MCP server force stopped");
207                } else {
208                    return Err(CliError::Server("Failed to stop MCP server".to_string()));
209                }
210            }
211
212            clear_mcp_server_pid().await?;
213        }
214        Ok(None) => {
215            eprintln!("No MCP server PID found");
216
217            // Try to find and kill any running MCP servers
218            if find_and_kill_mcp_processes(force).await? {
219                eprintln!("āœ“ Found and stopped running MCP server(s)");
220            } else {
221                eprintln!("No running MCP servers found");
222            }
223        }
224        Err(e) => {
225            warn!("Error getting MCP server PID: {}", e);
226
227            // Try to find and kill any running MCP servers
228            if find_and_kill_mcp_processes(force).await? {
229                eprintln!("āœ“ Found and stopped running MCP server(s)");
230            } else {
231                return Err(CliError::Server("No MCP server found to stop".to_string()));
232            }
233        }
234    }
235
236    Ok(())
237}
238
239/// Show MCP server status
240pub async fn show_mcp_server_status() -> Result<(), CliError> {
241    info!("Getting MCP server status");
242
243    eprintln!("MCP Server Status");
244    eprintln!("=================");
245
246    // Check if we have a saved PID
247    match get_mcp_server_pid().await {
248        Ok(Some(pid)) => {
249            eprintln!("Saved PID: {}", pid);
250
251            // Check if process is actually running
252            if is_process_running(pid).await? {
253                eprintln!("Status: āœ“ Running");
254
255                // Try to get additional info
256                if let Ok(info) = get_process_info(pid).await {
257                    eprintln!("Uptime: {}", info.uptime);
258                    eprintln!("Memory: {}", info.memory);
259                    eprintln!("CPU: {}%", info.cpu_percent);
260                }
261
262                // Try to check server health
263                match check_server_health().await {
264                    Ok(health) => {
265                        eprintln!("Health: āœ“ Healthy");
266                        eprintln!("Endpoint: {}", health.endpoint);
267                        eprintln!("Response Time: {}ms", health.response_time_ms);
268                    }
269                    Err(e) => {
270                        eprintln!("Health: āœ— Unhealthy ({})", e);
271                    }
272                }
273            } else {
274                eprintln!("Status: āœ— Not running (stale PID)");
275                clear_mcp_server_pid().await?;
276            }
277        }
278        Ok(None) => {
279            eprintln!("Saved PID: None");
280
281            // Try to find running MCP servers
282            let running_processes = find_mcp_processes().await?;
283            if running_processes.is_empty() {
284                eprintln!("Status: āœ— Not running");
285            } else {
286                eprintln!("Status: ⚠ Running (unmanaged)");
287                eprintln!(
288                    "Found {} unmanaged MCP process(es):",
289                    running_processes.len()
290                );
291                for pid in running_processes {
292                    eprintln!("  - PID: {}", pid);
293                }
294            }
295        }
296        Err(e) => {
297            eprintln!("Error: {}", e);
298        }
299    }
300
301    // Show configuration info
302    eprintln!();
303    eprintln!("Configuration:");
304    if let Ok(config_path) = get_default_config_path() {
305        if config_path.exists() {
306            eprintln!("  Config file: {:?}", config_path);
307
308            if let Ok(config) = McpConfig::from_file(&config_path) {
309                eprintln!("  Server host: {}", config.server.host);
310                eprintln!("  Server port: {}", config.server.port);
311                eprintln!("  Max connections: {}", config.server.max_connections);
312                eprintln!("  TLS enabled: {}", config.server.tls_enabled);
313            }
314        } else {
315            eprintln!("  Config file: Not found (using defaults)");
316        }
317    }
318
319    Ok(())
320}
321
322/// List available MCP tools
323pub async fn list_mcp_tools() -> Result<(), CliError> {
324    info!("Listing available MCP tools");
325
326    eprintln!("Available MCP Tools");
327    eprintln!("===================");
328
329    // Create a default server to get tools list
330    let server = create_server()
331        .await
332        .map_err(|e| CliError::Server(format!("Failed to create server: {}", e)))?;
333
334    let stats = server.stats().await;
335
336    eprintln!("Tools Count: {}", stats.tools_count);
337
338    // TODO: Update when MCP server provides tools() method
339    eprintln!("Note: Tool listing not implemented yet in MCP server");
340    eprintln!("Available tool types:");
341    eprintln!("  - vault: Vault operations");
342    eprintln!("  - dag: DAG operations");
343    eprintln!("  - network: Network operations");
344    eprintln!("  - crypto: Cryptographic operations");
345
346    Ok(())
347}
348
349/// List available MCP resources
350pub async fn list_mcp_resources() -> Result<(), CliError> {
351    info!("Listing available MCP resources");
352
353    eprintln!("Available MCP Resources");
354    eprintln!("=======================");
355
356    // Create a default server to get resources list
357    let server = create_server()
358        .await
359        .map_err(|e| CliError::Server(format!("Failed to create server: {}", e)))?;
360
361    let stats = server.stats().await;
362
363    eprintln!("Resources Count: {}", stats.resources_count);
364
365    // TODO: Update when MCP server provides resources() method
366    eprintln!("Note: Resource listing not implemented yet in MCP server");
367    eprintln!("Available resource types:");
368    eprintln!("  - vault_entries: Vault entries and secrets");
369    eprintln!("  - dag_state: DAG state and nodes");
370    eprintln!("  - network_peers: Network peer information");
371    eprintln!("  - system_status: System status and health");
372
373    Ok(())
374}
375
376/// Test MCP server connectivity
377pub async fn test_mcp_server(endpoint: String) -> Result<(), CliError> {
378    info!("Testing MCP server connectivity to {}", endpoint);
379
380    eprintln!("Testing MCP Server Connectivity");
381    eprintln!("===============================");
382    eprintln!("Endpoint: {}", endpoint);
383    eprintln!();
384
385    // Parse endpoint
386    let url = if endpoint.starts_with("http") {
387        endpoint
388    } else {
389        format!("http://{}", endpoint)
390    };
391
392    eprintln!("šŸ”— Connecting to {}...", url);
393
394    // Test basic HTTP connectivity
395    let client = reqwest::Client::new();
396    let start_time = std::time::Instant::now();
397
398    match client
399        .get(&url)
400        .timeout(std::time::Duration::from_secs(10))
401        .send()
402        .await
403    {
404        Ok(response) => {
405            let duration = start_time.elapsed();
406            eprintln!("āœ“ HTTP connection successful");
407            eprintln!("  Status: {}", response.status());
408            eprintln!("  Response time: {}ms", duration.as_millis());
409
410            // Test MCP protocol if it's an MCP endpoint
411            if url.contains("/mcp") || url.ends_with(":3000") {
412                eprintln!();
413                eprintln!("šŸ” Testing MCP protocol...");
414
415                // Send a ping request
416                let ping_request = serde_json::json!({
417                    "jsonrpc": "2.0",
418                    "id": 1,
419                    "method": "ping",
420                    "params": {}
421                });
422
423                let mcp_url = if url.ends_with("/mcp") {
424                    url
425                } else if url.ends_with("/") {
426                    format!("{}mcp", url)
427                } else {
428                    format!("{}/mcp", url)
429                };
430
431                match client.post(&mcp_url).json(&ping_request).send().await {
432                    Ok(mcp_response) => {
433                        if mcp_response.status().is_success() {
434                            eprintln!("āœ“ MCP protocol test successful");
435
436                            // Try to parse JSON-RPC response
437                            if let Ok(body) = mcp_response.text().await {
438                                eprintln!("  Response: {}", body);
439                            }
440                        } else {
441                            eprintln!("āœ— MCP protocol test failed: {}", mcp_response.status());
442                        }
443                    }
444                    Err(e) => {
445                        eprintln!("āœ— MCP protocol test failed: {}", e);
446                    }
447                }
448            }
449        }
450        Err(e) => {
451            let duration = start_time.elapsed();
452            eprintln!("āœ— Connection failed after {}ms", duration.as_millis());
453            eprintln!("  Error: {}", e);
454
455            // Provide troubleshooting suggestions
456            eprintln!();
457            eprintln!("Troubleshooting:");
458            eprintln!("  - Check if MCP server is running: qudag mcp status");
459            eprintln!("  - Verify the endpoint address and port");
460            eprintln!("  - Check firewall settings");
461            eprintln!("  - Try starting MCP server: qudag mcp start");
462
463            return Err(CliError::Server(format!("MCP server test failed: {}", e)));
464        }
465    }
466
467    eprintln!();
468    eprintln!("āœ“ MCP server test completed successfully");
469
470    Ok(())
471}
472
473// Helper functions
474
475async fn setup_shutdown_handler() -> tokio::signal::unix::Signal {
476    use tokio::signal::unix::{signal, SignalKind};
477    signal(SignalKind::interrupt()).expect("Failed to setup signal handler")
478}
479
480async fn save_mcp_server_pid(pid: u32) -> Result<(), CliError> {
481    let pid_file = get_pid_file_path()?;
482    if let Some(parent) = pid_file.parent() {
483        tokio::fs::create_dir_all(parent)
484            .await
485            .map_err(|e| CliError::Config(format!("Failed to create PID directory: {}", e)))?;
486    }
487
488    tokio::fs::write(&pid_file, pid.to_string())
489        .await
490        .map_err(|e| CliError::Config(format!("Failed to save PID file: {}", e)))
491}
492
493async fn get_mcp_server_pid() -> Result<Option<u32>, CliError> {
494    let pid_file = get_pid_file_path()?;
495
496    if !pid_file.exists() {
497        return Ok(None);
498    }
499
500    let content = tokio::fs::read_to_string(&pid_file)
501        .await
502        .map_err(|e| CliError::Config(format!("Failed to read PID file: {}", e)))?;
503
504    let pid = content
505        .trim()
506        .parse::<u32>()
507        .map_err(|e| CliError::Config(format!("Invalid PID in file: {}", e)))?;
508
509    Ok(Some(pid))
510}
511
512async fn clear_mcp_server_pid() -> Result<(), CliError> {
513    let pid_file = get_pid_file_path()?;
514
515    if pid_file.exists() {
516        tokio::fs::remove_file(&pid_file)
517            .await
518            .map_err(|e| CliError::Config(format!("Failed to remove PID file: {}", e)))?;
519    }
520
521    Ok(())
522}
523
524fn get_pid_file_path() -> Result<PathBuf, CliError> {
525    let home = std::env::var("HOME")
526        .map_err(|_| CliError::Config("Unable to determine home directory".to_string()))?;
527    Ok(PathBuf::from(home).join(".qudag").join("mcp-server.pid"))
528}
529
530async fn terminate_process(pid: u32, force: bool) -> Result<bool, CliError> {
531    use tokio::process::Command;
532
533    let signal = if force { "KILL" } else { "TERM" };
534
535    let output = Command::new("kill")
536        .arg(format!("-{}", signal))
537        .arg(pid.to_string())
538        .output()
539        .await
540        .map_err(|e| CliError::Server(format!("Failed to send signal to process: {}", e)))?;
541
542    if output.status.success() {
543        // Wait a moment for the process to terminate
544        tokio::time::sleep(tokio::time::Duration::from_secs(if force { 1 } else { 3 })).await;
545
546        // Check if process is still running
547        Ok(!is_process_running(pid).await?)
548    } else {
549        let error = String::from_utf8_lossy(&output.stderr);
550        if error.contains("No such process") {
551            Ok(true) // Process already gone
552        } else {
553            Err(CliError::Server(format!(
554                "Failed to terminate process: {}",
555                error
556            )))
557        }
558    }
559}
560
561async fn is_process_running(pid: u32) -> Result<bool, CliError> {
562    use tokio::process::Command;
563
564    let output = Command::new("kill")
565        .arg("-0")
566        .arg(pid.to_string())
567        .output()
568        .await
569        .map_err(|e| CliError::Server(format!("Failed to check process: {}", e)))?;
570
571    Ok(output.status.success())
572}
573
574async fn find_mcp_processes() -> Result<Vec<u32>, CliError> {
575    use tokio::process::Command;
576
577    let output = Command::new("pgrep")
578        .arg("-f")
579        .arg("qudag.*mcp.*start")
580        .output()
581        .await
582        .map_err(|e| CliError::Server(format!("Failed to search for MCP processes: {}", e)))?;
583
584    if output.status.success() {
585        let pids = String::from_utf8_lossy(&output.stdout)
586            .lines()
587            .filter_map(|line| line.trim().parse::<u32>().ok())
588            .collect();
589        Ok(pids)
590    } else {
591        Ok(Vec::new())
592    }
593}
594
595async fn find_and_kill_mcp_processes(force: bool) -> Result<bool, CliError> {
596    let pids = find_mcp_processes().await?;
597
598    if pids.is_empty() {
599        return Ok(false);
600    }
601
602    let mut killed_any = false;
603    for pid in pids {
604        if terminate_process(pid, force).await? {
605            killed_any = true;
606        }
607    }
608
609    Ok(killed_any)
610}
611
612#[derive(Debug)]
613struct ProcessInfo {
614    uptime: String,
615    memory: String,
616    cpu_percent: f64,
617}
618
619async fn get_process_info(_pid: u32) -> Result<ProcessInfo, CliError> {
620    // This is a simplified implementation
621    // In a real implementation, you would parse /proc/{pid}/stat and /proc/{pid}/status
622    Ok(ProcessInfo {
623        uptime: "Unknown".to_string(),
624        memory: "Unknown".to_string(),
625        cpu_percent: 0.0,
626    })
627}
628
629#[derive(Debug)]
630struct ServerHealth {
631    endpoint: String,
632    response_time_ms: u64,
633}
634
635async fn check_server_health() -> Result<ServerHealth, CliError> {
636    // This would implement a health check against the running MCP server
637    // For now, return a mock result
638    Ok(ServerHealth {
639        endpoint: "http://127.0.0.1:3000/mcp".to_string(),
640        response_time_ms: 25,
641    })
642}
643
644fn get_default_config_path() -> Result<PathBuf, CliError> {
645    let home = std::env::var("HOME")
646        .map_err(|_| CliError::Config("Unable to determine home directory".to_string()))?;
647    Ok(PathBuf::from(home).join(".qudag").join("mcp-config.toml"))
648}