Skip to main content

codetether_agent/mcp/
server.rs

1//! MCP Server - Exposes CodeTether tools to MCP clients
2//!
3//! Runs as a stdio-based MCP server that can be connected to by:
4//! - Claude Desktop
5//! - Other MCP clients
6//!
7//! Exposed tools include:
8//! - run_command: Execute shell commands
9//! - read_file: Read file contents
10//! - write_file: Write file contents
11//! - search_files: Search for files
12//! - swarm: Execute tasks with parallel sub-agents
13//! - rlm: Analyze large content
14//! - ralph: Autonomous PRD-driven execution
15
16use super::bus_bridge::BusBridge;
17use super::transport::{McpMessage, StdioTransport, Transport};
18use super::types::*;
19use crate::bus::{AgentBus, BusMessage};
20use crate::tool::ToolRegistry;
21use anyhow::Result;
22use serde_json::{Value, json};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26use tracing::{debug, info, warn};
27
28/// MCP Server implementation
29pub struct McpServer {
30    transport: Arc<dyn Transport>,
31    tools: RwLock<HashMap<String, McpToolHandler>>,
32    resources: RwLock<HashMap<String, McpResourceHandler>>,
33    /// Prompt handlers for MCP prompts (reserved for future use)
34    #[allow(dead_code)]
35    prompts: RwLock<HashMap<String, McpPromptHandler>>,
36    initialized: RwLock<bool>,
37    server_info: ServerInfo,
38    /// Tool metadata storage for querying tool information
39    metadata: RwLock<HashMap<String, ToolMetadata>>,
40    /// Resource metadata storage for querying resource information
41    resource_metadata: RwLock<HashMap<String, ResourceMetadata>>,
42    /// Optional bus bridge for live event monitoring
43    bus: Option<Arc<BusBridge>>,
44    /// Optional local agent bus for publishing tool calls to S3 sink
45    agent_bus: Option<Arc<AgentBus>>,
46    /// Optional tool registry bridging all CodeTether tools to MCP
47    tool_registry: Option<Arc<ToolRegistry>>,
48}
49
50type McpToolHandler = Arc<dyn Fn(Value) -> Result<CallToolResult> + Send + Sync>;
51type McpResourceHandler = Arc<dyn Fn(String) -> Result<ReadResourceResult> + Send + Sync>;
52type McpPromptHandler = Arc<dyn Fn(Value) -> Result<GetPromptResult> + Send + Sync>;
53
54impl McpServer {
55    /// Create a new MCP server over stdio
56    pub fn new_stdio() -> Self {
57        // Use Arc's unsized coercion to convert Arc<StdioTransport> -> Arc<dyn Transport>
58        let transport: Arc<dyn Transport> = Arc::new(StdioTransport::new());
59        Self::new(transport)
60    }
61
62    /// Create a new MCP server for in-process/local usage.
63    ///
64    /// Unlike [`Self::new_stdio`], this does not spawn any stdio reader/writer threads
65    /// and will not lock stdout. This is intended for CLI flows that need to query
66    /// tool metadata or invoke tools directly without running a long-lived stdio server.
67    pub fn new_local() -> Self {
68        let transport: Arc<dyn Transport> = Arc::new(super::transport::NullTransport::new());
69        Self::new(transport)
70    }
71
72    /// Create a new MCP server with custom transport
73    pub fn new(transport: Arc<dyn Transport>) -> Self {
74        let mut server = Self {
75            transport,
76            tools: RwLock::new(HashMap::new()),
77            resources: RwLock::new(HashMap::new()),
78            prompts: RwLock::new(HashMap::new()),
79            initialized: RwLock::new(false),
80            server_info: ServerInfo {
81                name: "codetether".to_string(),
82                version: env!("CARGO_PKG_VERSION").to_string(),
83            },
84            metadata: RwLock::new(HashMap::new()),
85            resource_metadata: RwLock::new(HashMap::new()),
86            bus: None,
87            agent_bus: None,
88            tool_registry: None,
89        };
90
91        // Register default tools
92        server.register_default_tools();
93
94        server
95    }
96
97    /// Attach the full CodeTether tool registry to the MCP server.
98    ///
99    /// All tools from the registry will be exposed as MCP tools, replacing
100    /// the hardcoded basic tool set. Call before [`Self::run`].
101    pub fn with_tool_registry(mut self, registry: Arc<ToolRegistry>) -> Self {
102        self.tool_registry = Some(registry);
103        self
104    }
105
106    /// Attach a local agent bus for publishing tool calls to the S3 sink.
107    ///
108    /// Call this *before* [`Self::run`] so every tool invocation gets
109    /// recorded as a `ToolRequest` + `ToolResponse` on the bus.
110    pub fn with_agent_bus(mut self, bus: Arc<AgentBus>) -> Self {
111        self.agent_bus = Some(bus);
112        self
113    }
114
115    /// Attach a bus bridge and register bus-aware tools + resources.
116    ///
117    /// Call this *before* [`Self::run`] to enable live bus monitoring.
118    pub async fn with_bus(self, bus_url: String) -> Self {
119        self.with_bus_auth(bus_url, None).await
120    }
121
122    /// Attach a bus bridge with optional authentication.
123    pub async fn with_bus_auth(mut self, bus_url: String, auth_token: Option<String>) -> Self {
124        let bridge = BusBridge::with_auth(bus_url, auth_token).spawn();
125        self.bus = Some(Arc::clone(&bridge));
126        self.register_bus_tools(Arc::clone(&bridge)).await;
127        self.register_bus_resources(Arc::clone(&bridge)).await;
128        self
129    }
130
131    /// Register bus-specific MCP tools.
132    async fn register_bus_tools(&self, bridge: Arc<BusBridge>) {
133        // ── bus_events ──────────────────────────────────────────────
134        let b = Arc::clone(&bridge);
135        self.register_tool(
136            "bus_events",
137            "Query recent events from the agent bus. Returns BusEnvelope JSON objects \
138             matching the optional topic filter (supports wildcards like 'ralph.*').",
139            json!({
140                "type": "object",
141                "properties": {
142                    "topic_filter": {
143                        "type": "string",
144                        "description": "Topic pattern to filter (e.g. 'ralph.*', 'agent.*', '*'). Default: all."
145                    },
146                    "limit": {
147                        "type": "integer",
148                        "description": "Max events to return (default: 50, max: 500)"
149                    }
150                }
151            }),
152            Arc::new(move |args| {
153                let topic_filter = args.get("topic_filter").and_then(|v| v.as_str()).map(String::from);
154                let limit = args
155                    .get("limit")
156                    .and_then(|v| v.as_u64())
157                    .unwrap_or(50)
158                    .min(500) as usize;
159
160                let b = Arc::clone(&b);
161                let events = tokio::task::block_in_place(|| {
162                    tokio::runtime::Handle::current().block_on(async {
163                        b.recent_events(topic_filter.as_deref(), limit, None).await
164                    })
165                });
166
167                let text = serde_json::to_string_pretty(&events)
168                    .unwrap_or_else(|_| "[]".to_string());
169
170                Ok(CallToolResult {
171                    content: vec![ToolContent::Text { text }],
172                    is_error: false,
173                })
174            }),
175        )
176        .await;
177
178        // ── bus_status ──────────────────────────────────────────────
179        let b = Arc::clone(&bridge);
180        self.register_tool(
181            "bus_status",
182            "Get the current status of the bus bridge: connection state, event count, \
183             and buffer usage.",
184            json!({
185                "type": "object",
186                "properties": {}
187            }),
188            Arc::new(move |_args| {
189                let status = b.status();
190                let buffer_len = tokio::task::block_in_place(|| {
191                    tokio::runtime::Handle::current().block_on(b.buffer_len())
192                });
193
194                let text = serde_json::to_string_pretty(&json!({
195                    "connected": status.connected,
196                    "total_received": status.total_received,
197                    "buffer_used": buffer_len,
198                    "buffer_capacity": status.buffer_capacity,
199                    "bus_url": status.bus_url,
200                }))
201                .unwrap_or_default();
202
203                Ok(CallToolResult {
204                    content: vec![ToolContent::Text { text }],
205                    is_error: false,
206                })
207            }),
208        )
209        .await;
210
211        // ── ralph_status ────────────────────────────────────────────
212        self.register_tool(
213            "ralph_status",
214            "Get current Ralph PRD status: story pass/fail states, iteration count, \
215             and progress.txt content. Reads prd.json and progress.txt from the \
216             current working directory.",
217            json!({
218                "type": "object",
219                "properties": {
220                    "prd_path": {
221                        "type": "string",
222                        "description": "Path to prd.json (default: ./prd.json)"
223                    }
224                }
225            }),
226            Arc::new(|args| {
227                let prd_path = args
228                    .get("prd_path")
229                    .and_then(|v| v.as_str())
230                    .unwrap_or("prd.json");
231
232                let mut result = json!({});
233
234                // Read PRD
235                if let Ok(content) = std::fs::read_to_string(prd_path) {
236                    if let Ok(prd) = serde_json::from_str::<serde_json::Value>(&content) {
237                        let stories = prd.get("user_stories").and_then(|s| s.as_array());
238                        let passed = stories
239                            .map(|arr| {
240                                arr.iter()
241                                    .filter(|s| {
242                                        s.get("passes").and_then(|v| v.as_bool()).unwrap_or(false)
243                                    })
244                                    .count()
245                            })
246                            .unwrap_or(0);
247                        let total = stories.map(|arr| arr.len()).unwrap_or(0);
248
249                        result["prd"] = prd;
250                        result["summary"] = json!({
251                            "passed": passed,
252                            "total": total,
253                            "progress_pct": if total > 0 { (passed * 100) / total } else { 0 },
254                        });
255                    }
256                } else {
257                    result["prd_error"] = json!("prd.json not found");
258                }
259
260                // Read progress.txt
261                let progress_path = std::path::Path::new(prd_path)
262                    .parent()
263                    .unwrap_or(std::path::Path::new("."))
264                    .join("progress.txt");
265                if let Ok(progress) = std::fs::read_to_string(&progress_path) {
266                    result["progress"] = json!(progress);
267                }
268
269                let text = serde_json::to_string_pretty(&result).unwrap_or_default();
270
271                Ok(CallToolResult {
272                    content: vec![ToolContent::Text { text }],
273                    is_error: false,
274                })
275            }),
276        )
277        .await;
278
279        info!("Registered bus-aware MCP tools: bus_events, bus_status, ralph_status");
280    }
281
282    /// Register bus-specific MCP resources.
283    async fn register_bus_resources(&self, bridge: Arc<BusBridge>) {
284        // ── codetether://bus/events/recent ───────────────────────────
285        let b = Arc::clone(&bridge);
286        self.register_resource(
287            "codetether://bus/events/recent",
288            "Recent Bus Events",
289            "Last 100 events from the agent bus (JSON array of BusEnvelope)",
290            Some("application/json"),
291            Arc::new(move |_uri| {
292                let events = tokio::task::block_in_place(|| {
293                    tokio::runtime::Handle::current()
294                        .block_on(async { b.recent_events(None, 100, None).await })
295                });
296                let text =
297                    serde_json::to_string_pretty(&events).unwrap_or_else(|_| "[]".to_string());
298                Ok(ReadResourceResult {
299                    contents: vec![ResourceContents {
300                        uri: "codetether://bus/events/recent".to_string(),
301                        mime_type: Some("application/json".to_string()),
302                        content: ResourceContent::Text { text },
303                    }],
304                })
305            }),
306        )
307        .await;
308
309        // ── codetether://ralph/prd ──────────────────────────────────
310        self.register_resource(
311            "codetether://ralph/prd",
312            "Ralph PRD",
313            "Current PRD JSON with story pass/fail status",
314            Some("application/json"),
315            Arc::new(|_uri| {
316                let text = std::fs::read_to_string("prd.json")
317                    .unwrap_or_else(|_| r#"{"error": "prd.json not found"}"#.to_string());
318                Ok(ReadResourceResult {
319                    contents: vec![ResourceContents {
320                        uri: "codetether://ralph/prd".to_string(),
321                        mime_type: Some("application/json".to_string()),
322                        content: ResourceContent::Text { text },
323                    }],
324                })
325            }),
326        )
327        .await;
328
329        // ── codetether://ralph/progress ─────────────────────────────
330        self.register_resource(
331            "codetether://ralph/progress",
332            "Ralph Progress",
333            "progress.txt content from the current Ralph run",
334            Some("text/plain"),
335            Arc::new(|_uri| {
336                let text = std::fs::read_to_string("progress.txt")
337                    .unwrap_or_else(|_| "(no progress.txt found)".to_string());
338                Ok(ReadResourceResult {
339                    contents: vec![ResourceContents {
340                        uri: "codetether://ralph/progress".to_string(),
341                        mime_type: Some("text/plain".to_string()),
342                        content: ResourceContent::Text { text },
343                    }],
344                })
345            }),
346        )
347        .await;
348
349        info!("Registered bus-aware MCP resources");
350    }
351
352    /// Register default CodeTether tools
353    fn register_default_tools(&mut self) {
354        // These will be registered synchronously in the constructor
355        // The actual tool handlers will be added in run()
356    }
357
358    /// Register a tool
359    pub async fn register_tool(
360        &self,
361        name: &str,
362        description: &str,
363        input_schema: Value,
364        handler: McpToolHandler,
365    ) {
366        // Store tool metadata
367        let metadata = ToolMetadata::new(
368            name.to_string(),
369            Some(description.to_string()),
370            input_schema.clone(),
371        );
372
373        let mut metadata_map = self.metadata.write().await;
374        metadata_map.insert(name.to_string(), metadata);
375        drop(metadata_map);
376
377        let mut tools = self.tools.write().await;
378        tools.insert(name.to_string(), handler);
379
380        debug!("Registered MCP tool: {}", name);
381    }
382
383    /// Register a resource
384    pub async fn register_resource(
385        &self,
386        uri: &str,
387        name: &str,
388        description: &str,
389        mime_type: Option<&str>,
390        handler: McpResourceHandler,
391    ) {
392        // Store resource metadata
393        let metadata = ResourceMetadata::new(
394            uri.to_string(),
395            name.to_string(),
396            Some(description.to_string()),
397            mime_type.map(|s| s.to_string()),
398        );
399
400        let mut metadata_map = self.resource_metadata.write().await;
401        metadata_map.insert(uri.to_string(), metadata);
402        drop(metadata_map);
403
404        let mut resources = self.resources.write().await;
405        resources.insert(uri.to_string(), handler);
406
407        debug!("Registered MCP resource: {}", uri);
408    }
409
410    /// Get tool metadata by name
411    pub async fn get_tool_metadata(&self, name: &str) -> Option<ToolMetadata> {
412        let metadata = self.metadata.read().await;
413        metadata.get(name).cloned()
414    }
415
416    /// Get all tool metadata
417    pub async fn get_all_tool_metadata(&self) -> Vec<ToolMetadata> {
418        let metadata = self.metadata.read().await;
419        metadata.values().cloned().collect()
420    }
421
422    /// Get resource metadata by URI
423    pub async fn get_resource_metadata(&self, uri: &str) -> Option<ResourceMetadata> {
424        let metadata = self.resource_metadata.read().await;
425        metadata.get(uri).cloned()
426    }
427
428    /// Get all resource metadata
429    pub async fn get_all_resource_metadata(&self) -> Vec<ResourceMetadata> {
430        let metadata = self.resource_metadata.read().await;
431        metadata.values().cloned().collect()
432    }
433
434    /// Register a prompt handler
435    pub async fn register_prompt(&self, name: &str, handler: McpPromptHandler) {
436        let mut prompts = self.prompts.write().await;
437        prompts.insert(name.to_string(), handler);
438        debug!("Registered MCP prompt: {}", name);
439    }
440
441    /// Get a prompt handler by name
442    pub async fn get_prompt_handler(&self, name: &str) -> Option<McpPromptHandler> {
443        let prompts = self.prompts.read().await;
444        prompts.get(name).cloned()
445    }
446
447    /// List all registered prompt names
448    pub async fn list_prompts(&self) -> Vec<String> {
449        let prompts = self.prompts.read().await;
450        prompts.keys().cloned().collect()
451    }
452
453    /// Run the MCP server (main loop)
454    pub async fn run(&self) -> Result<()> {
455        info!("Starting MCP server...");
456
457        // Register tools before starting
458        self.setup_tools().await;
459
460        loop {
461            match self.transport.receive().await? {
462                Some(McpMessage::Request(request)) => {
463                    let response = self.handle_request(request).await;
464                    self.transport.send_response(response).await?;
465                }
466                Some(McpMessage::Notification(notification)) => {
467                    self.handle_notification(notification).await;
468                }
469                Some(McpMessage::Response(response)) => {
470                    // We received a response (shouldn't happen in server mode)
471                    warn!("Unexpected response received: {:?}", response.id);
472                }
473                None => {
474                    info!("Transport closed, shutting down MCP server");
475                    break;
476                }
477            }
478        }
479
480        Ok(())
481    }
482
483    /// Setup default tools (public, for CLI use)
484    pub async fn setup_tools_public(&self) {
485        self.setup_tools().await;
486    }
487
488    /// Call a tool directly without going through the transport
489    pub async fn call_tool_direct(&self, name: &str, arguments: Value) -> Result<CallToolResult> {
490        let tools = self.tools.read().await;
491        let handler = tools
492            .get(name)
493            .ok_or_else(|| anyhow::anyhow!("Tool not found: {}", name))?
494            .clone();
495        drop(tools);
496        handler(arguments)
497    }
498
499    /// Setup default tools
500    async fn setup_tools(&self) {
501        // If a ToolRegistry was provided, bridge ALL its tools into MCP
502        if let Some(registry) = &self.tool_registry {
503            self.register_registry_tools(registry).await;
504            info!(
505                "Registered {} MCP tools from ToolRegistry",
506                self.tools.read().await.len()
507            );
508            return;
509        }
510
511        // Fallback: register basic hardcoded tools when no registry is available
512        self.register_fallback_tools().await;
513        info!(
514            "Registered {} MCP tools (fallback)",
515            self.tools.read().await.len()
516        );
517    }
518
519    /// Bridge all tools from a ToolRegistry into MCP tool handlers.
520    async fn register_registry_tools(&self, registry: &Arc<ToolRegistry>) {
521        // Skip tools that don't make sense over MCP:
522        // - question: interactive TUI-only prompt
523        // - invalid: internal error handler
524        // - batch: needs weak registry reference, internal orchestration
525        // - confirm_edit / confirm_multiedit: dead TUI confirmation tools
526        // - plan_enter / plan_exit: session-state dependent TUI tools
527        // - voice / podcast / youtube / avatar / image: media generation tools
528        // - undo: git undo, dangerous without TUI context
529        let skip_tools = [
530            "question",
531            "invalid",
532            "batch",
533            "confirm_edit",
534            "confirm_multiedit",
535            "plan_enter",
536            "plan_exit",
537            "voice",
538            "podcast",
539            "youtube",
540            "avatar",
541            "image",
542            "undo",
543        ];
544
545        for tool_id in registry.list() {
546            if skip_tools.contains(&tool_id) {
547                continue;
548            }
549
550            let tool = match registry.get(tool_id) {
551                Some(t) => t,
552                None => continue,
553            };
554
555            let tool_clone = Arc::clone(&tool);
556
557            self.register_tool(
558                tool.id(),
559                tool.description(),
560                tool.parameters(),
561                Arc::new(move |args: Value| {
562                    let tool = Arc::clone(&tool_clone);
563                    let result = tokio::task::block_in_place(|| {
564                        tokio::runtime::Handle::current()
565                            .block_on(async { tool.execute(args).await })
566                    });
567
568                    match result {
569                        Ok(tool_result) => Ok(CallToolResult {
570                            content: vec![ToolContent::Text {
571                                text: tool_result.output,
572                            }],
573                            is_error: !tool_result.success,
574                        }),
575                        Err(e) => Ok(CallToolResult {
576                            content: vec![ToolContent::Text {
577                                text: e.to_string(),
578                            }],
579                            is_error: true,
580                        }),
581                    }
582                }),
583            )
584            .await;
585        }
586    }
587
588    /// Fallback hardcoded tools for when no ToolRegistry is provided.
589    async fn register_fallback_tools(&self) {
590        // run_command tool
591        self.register_tool(
592            "run_command",
593            "Execute a shell command and return the output",
594            json!({
595                "type": "object",
596                "properties": {
597                    "command": {
598                        "type": "string",
599                        "description": "The command to execute"
600                    },
601                    "cwd": {
602                        "type": "string",
603                        "description": "Working directory (optional)"
604                    },
605                    "timeout_ms": {
606                        "type": "integer",
607                        "description": "Timeout in milliseconds (default: 30000)"
608                    }
609                },
610                "required": ["command"]
611            }),
612            Arc::new(|args| {
613                let command = args
614                    .get("command")
615                    .and_then(|v| v.as_str())
616                    .ok_or_else(|| anyhow::anyhow!("Missing command"))?;
617
618                let cwd = args.get("cwd").and_then(|v| v.as_str());
619
620                let mut cmd = std::process::Command::new("/bin/sh");
621                cmd.arg("-c").arg(command);
622
623                if let Some(dir) = cwd {
624                    cmd.current_dir(dir);
625                }
626
627                let output = cmd.output()?;
628                let stdout = String::from_utf8_lossy(&output.stdout);
629                let stderr = String::from_utf8_lossy(&output.stderr);
630
631                let result = if output.status.success() {
632                    format!("{}{}", stdout, stderr)
633                } else {
634                    format!(
635                        "Exit code: {}\n{}{}",
636                        output.status.code().unwrap_or(-1),
637                        stdout,
638                        stderr
639                    )
640                };
641
642                Ok(CallToolResult {
643                    content: vec![ToolContent::Text { text: result }],
644                    is_error: !output.status.success(),
645                })
646            }),
647        )
648        .await;
649
650        // read_file tool
651        self.register_tool(
652            "read_file",
653            "Read the contents of a file",
654            json!({
655                "type": "object",
656                "properties": {
657                    "path": {
658                        "type": "string",
659                        "description": "Path to the file to read"
660                    },
661                    "offset": {
662                        "type": "integer",
663                        "description": "Line offset to start reading from (1-indexed)"
664                    },
665                    "limit": {
666                        "type": "integer",
667                        "description": "Maximum number of lines to read"
668                    }
669                },
670                "required": ["path"]
671            }),
672            Arc::new(|args| {
673                let path = args
674                    .get("path")
675                    .and_then(|v| v.as_str())
676                    .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
677
678                let content = std::fs::read_to_string(path)?;
679
680                let offset = args.get("offset").and_then(|v| v.as_u64()).unwrap_or(1) as usize;
681                let limit = args.get("limit").and_then(|v| v.as_u64());
682
683                let lines: Vec<&str> = content.lines().collect();
684                let start = (offset.saturating_sub(1)).min(lines.len());
685                let end = if let Some(l) = limit {
686                    (start + l as usize).min(lines.len())
687                } else {
688                    lines.len()
689                };
690
691                let result = lines[start..end].join("\n");
692
693                Ok(CallToolResult {
694                    content: vec![ToolContent::Text { text: result }],
695                    is_error: false,
696                })
697            }),
698        )
699        .await;
700
701        // write_file tool
702        self.register_tool(
703            "write_file",
704            "Write content to a file",
705            json!({
706                "type": "object",
707                "properties": {
708                    "path": {
709                        "type": "string",
710                        "description": "Path to the file to write"
711                    },
712                    "content": {
713                        "type": "string",
714                        "description": "Content to write"
715                    },
716                    "create_dirs": {
717                        "type": "boolean",
718                        "description": "Create parent directories if they don't exist"
719                    }
720                },
721                "required": ["path", "content"]
722            }),
723            Arc::new(|args| {
724                let path = args
725                    .get("path")
726                    .and_then(|v| v.as_str())
727                    .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
728
729                let content = args
730                    .get("content")
731                    .and_then(|v| v.as_str())
732                    .ok_or_else(|| anyhow::anyhow!("Missing content"))?;
733
734                let create_dirs = args
735                    .get("create_dirs")
736                    .and_then(|v| v.as_bool())
737                    .unwrap_or(false);
738
739                if create_dirs && let Some(parent) = std::path::Path::new(path).parent() {
740                    std::fs::create_dir_all(parent)?;
741                }
742
743                std::fs::write(path, content)?;
744
745                Ok(CallToolResult {
746                    content: vec![ToolContent::Text {
747                        text: format!("Wrote {} bytes to {}", content.len(), path),
748                    }],
749                    is_error: false,
750                })
751            }),
752        )
753        .await;
754
755        // list_directory tool
756        self.register_tool(
757            "list_directory",
758            "List contents of a directory",
759            json!({
760                "type": "object",
761                "properties": {
762                    "path": {
763                        "type": "string",
764                        "description": "Path to the directory"
765                    },
766                    "recursive": {
767                        "type": "boolean",
768                        "description": "List recursively"
769                    },
770                    "max_depth": {
771                        "type": "integer",
772                        "description": "Maximum depth for recursive listing"
773                    }
774                },
775                "required": ["path"]
776            }),
777            Arc::new(|args| {
778                let path = args
779                    .get("path")
780                    .and_then(|v| v.as_str())
781                    .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
782
783                let mut entries = Vec::new();
784                for entry in std::fs::read_dir(path)? {
785                    let entry = entry?;
786                    let file_type = entry.file_type()?;
787                    let name = entry.file_name().to_string_lossy().to_string();
788                    let suffix = if file_type.is_dir() { "/" } else { "" };
789                    entries.push(format!("{}{}", name, suffix));
790                }
791
792                entries.sort();
793
794                Ok(CallToolResult {
795                    content: vec![ToolContent::Text {
796                        text: entries.join("\n"),
797                    }],
798                    is_error: false,
799                })
800            }),
801        )
802        .await;
803
804        // search_files tool
805        self.register_tool(
806            "search_files",
807            "Search for files matching a pattern",
808            json!({
809                "type": "object",
810                "properties": {
811                    "pattern": {
812                        "type": "string",
813                        "description": "Search pattern (glob or regex)"
814                    },
815                    "path": {
816                        "type": "string",
817                        "description": "Directory to search in"
818                    },
819                    "content_pattern": {
820                        "type": "string",
821                        "description": "Pattern to search in file contents"
822                    }
823                },
824                "required": ["pattern"]
825            }),
826            Arc::new(|args| {
827                let pattern = args
828                    .get("pattern")
829                    .and_then(|v| v.as_str())
830                    .ok_or_else(|| anyhow::anyhow!("Missing pattern"))?;
831
832                let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
833
834                // Simple glob using find command
835                let output = std::process::Command::new("find")
836                    .args([path, "-name", pattern, "-type", "f"])
837                    .output()?;
838
839                let result = String::from_utf8_lossy(&output.stdout);
840
841                Ok(CallToolResult {
842                    content: vec![ToolContent::Text {
843                        text: result.to_string(),
844                    }],
845                    is_error: !output.status.success(),
846                })
847            }),
848        )
849        .await;
850
851        // grep_search tool
852        self.register_tool(
853            "grep_search",
854            "Search file contents using grep",
855            json!({
856                "type": "object",
857                "properties": {
858                    "query": {
859                        "type": "string",
860                        "description": "Search pattern"
861                    },
862                    "path": {
863                        "type": "string",
864                        "description": "Directory or file to search"
865                    },
866                    "is_regex": {
867                        "type": "boolean",
868                        "description": "Treat pattern as regex"
869                    },
870                    "case_sensitive": {
871                        "type": "boolean",
872                        "description": "Case-sensitive search"
873                    }
874                },
875                "required": ["query"]
876            }),
877            Arc::new(|args| {
878                let query = args
879                    .get("query")
880                    .and_then(|v| v.as_str())
881                    .ok_or_else(|| anyhow::anyhow!("Missing query"))?;
882
883                let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
884
885                let is_regex = args
886                    .get("is_regex")
887                    .and_then(|v| v.as_bool())
888                    .unwrap_or(false);
889
890                let case_sensitive = args
891                    .get("case_sensitive")
892                    .and_then(|v| v.as_bool())
893                    .unwrap_or(false);
894
895                let mut cmd = std::process::Command::new("grep");
896                cmd.arg("-r").arg("-n");
897
898                if !case_sensitive {
899                    cmd.arg("-i");
900                }
901
902                if is_regex {
903                    cmd.arg("-E");
904                } else {
905                    cmd.arg("-F");
906                }
907
908                cmd.arg(query).arg(path);
909
910                let output = cmd.output()?;
911                let result = String::from_utf8_lossy(&output.stdout);
912
913                Ok(CallToolResult {
914                    content: vec![ToolContent::Text {
915                        text: result.to_string(),
916                    }],
917                    is_error: false,
918                })
919            }),
920        )
921        .await;
922
923        info!("Registered {} MCP tools", self.tools.read().await.len());
924    }
925
926    /// Handle a JSON-RPC request
927    async fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse {
928        debug!(
929            "Handling request: {} (id: {:?})",
930            request.method, request.id
931        );
932
933        let result = match request.method.as_str() {
934            "initialize" => self.handle_initialize(request.params).await,
935            "initialized" => Ok(json!({})),
936            "ping" => Ok(json!({})),
937            "tools/list" => self.handle_list_tools(request.params).await,
938            "tools/call" => self.handle_call_tool(request.params).await,
939            "resources/list" => self.handle_list_resources(request.params).await,
940            "resources/read" => self.handle_read_resource(request.params).await,
941            "prompts/list" => self.handle_list_prompts(request.params).await,
942            "prompts/get" => self.handle_get_prompt(request.params).await,
943            _ => Err(JsonRpcError::method_not_found(&request.method)),
944        };
945
946        match result {
947            Ok(value) => JsonRpcResponse::success(request.id, value),
948            Err(error) => JsonRpcResponse::error(request.id, error),
949        }
950    }
951
952    /// Handle a notification
953    async fn handle_notification(&self, notification: JsonRpcNotification) {
954        debug!("Handling notification: {}", notification.method);
955
956        match notification.method.as_str() {
957            "notifications/initialized" => {
958                *self.initialized.write().await = true;
959                info!("MCP client initialized");
960            }
961            "notifications/cancelled" => {
962                // Handle cancellation
963            }
964            _ => {
965                debug!("Unknown notification: {}", notification.method);
966            }
967        }
968    }
969
970    /// Handle initialize request
971    async fn handle_initialize(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
972        let _params: InitializeParams = if let Some(p) = params {
973            serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
974        } else {
975            return Err(JsonRpcError::invalid_params("Missing params"));
976        };
977
978        let result = InitializeResult {
979            protocol_version: PROTOCOL_VERSION.to_string(),
980            capabilities: ServerCapabilities {
981                tools: Some(ToolsCapability { list_changed: true }),
982                resources: Some(ResourcesCapability {
983                    subscribe: false,
984                    list_changed: true,
985                }),
986                prompts: Some(PromptsCapability { list_changed: true }),
987                logging: Some(LoggingCapability {}),
988                experimental: None,
989            },
990            server_info: self.server_info.clone(),
991            instructions: Some(
992                "CodeTether is an AI coding agent with tools for file operations, \
993                 command execution, code search, and autonomous task execution. \
994                 Use the swarm tool for complex tasks requiring parallel execution, \
995                 and ralph for PRD-driven development."
996                    .to_string(),
997            ),
998        };
999
1000        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1001    }
1002
1003    /// Handle list tools request - reads from ToolMetadata registry
1004    async fn handle_list_tools(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1005        // Read tools from the metadata registry instead of hardcoded list
1006        let metadata = self.metadata.read().await;
1007
1008        let tool_list: Vec<McpTool> = metadata
1009            .values()
1010            .map(|tm| McpTool {
1011                name: tm.name.clone(),
1012                description: tm.description.clone(),
1013                input_schema: tm.input_schema.clone(),
1014            })
1015            .collect();
1016
1017        let result = ListToolsResult {
1018            tools: tool_list,
1019            next_cursor: None,
1020        };
1021
1022        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1023    }
1024
1025    /// Handle call tool request
1026    async fn handle_call_tool(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1027        let params: CallToolParams = if let Some(p) = params {
1028            serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1029        } else {
1030            return Err(JsonRpcError::invalid_params("Missing params"));
1031        };
1032
1033        // Publish ToolRequest to the agent bus (picked up by S3 sink)
1034        let request_id = uuid::Uuid::new_v4().to_string();
1035        let bus_handle = self.agent_bus.as_ref().map(|bus| bus.handle("mcp-server"));
1036        if let Some(ref bh) = bus_handle {
1037            bh.send(
1038                format!("tools.{}", &params.name),
1039                BusMessage::ToolRequest {
1040                    request_id: request_id.clone(),
1041                    agent_id: "mcp-server".into(),
1042                    tool_name: params.name.clone(),
1043                    arguments: params.arguments.clone(),
1044                    step: 0,
1045                },
1046            );
1047        }
1048
1049        let tools = self.tools.read().await;
1050        let handler = tools
1051            .get(&params.name)
1052            .ok_or_else(|| JsonRpcError::method_not_found(&params.name))?;
1053
1054        let (result_value, output_text, success) = match handler(params.arguments) {
1055            Ok(result) => {
1056                let text = result
1057                    .content
1058                    .iter()
1059                    .filter_map(|c| match c {
1060                        ToolContent::Text { text } => Some(text.as_str()),
1061                        _ => None,
1062                    })
1063                    .collect::<Vec<_>>()
1064                    .join("\n");
1065                let is_err = result.is_error;
1066                let val = serde_json::to_value(result)
1067                    .map_err(|e| JsonRpcError::internal_error(e.to_string()))?;
1068                (val, text, !is_err)
1069            }
1070            Err(e) => {
1071                let err_text = e.to_string();
1072                let result = CallToolResult {
1073                    content: vec![ToolContent::Text {
1074                        text: err_text.clone(),
1075                    }],
1076                    is_error: true,
1077                };
1078                let val = serde_json::to_value(result)
1079                    .map_err(|e| JsonRpcError::internal_error(e.to_string()))?;
1080                (val, err_text, false)
1081            }
1082        };
1083
1084        // Publish ToolResponse to the agent bus
1085        if let Some(ref bh) = bus_handle {
1086            bh.send(
1087                format!("tools.{}", &params.name),
1088                BusMessage::ToolResponse {
1089                    request_id,
1090                    agent_id: "mcp-server".into(),
1091                    tool_name: params.name,
1092                    result: output_text,
1093                    success,
1094                    step: 0,
1095                },
1096            );
1097        }
1098
1099        Ok(result_value)
1100    }
1101
1102    /// Handle list resources request — reads from registered resource metadata
1103    async fn handle_list_resources(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1104        let metadata = self.resource_metadata.read().await;
1105        let resource_list: Vec<McpResource> = metadata
1106            .values()
1107            .map(|rm| McpResource {
1108                uri: rm.uri.clone(),
1109                name: rm.name.clone(),
1110                description: rm.description.clone(),
1111                mime_type: rm.mime_type.clone(),
1112            })
1113            .collect();
1114
1115        let result = ListResourcesResult {
1116            resources: resource_list,
1117            next_cursor: None,
1118        };
1119
1120        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1121    }
1122
1123    /// Handle read resource request
1124    async fn handle_read_resource(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1125        let params: ReadResourceParams = if let Some(p) = params {
1126            serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1127        } else {
1128            return Err(JsonRpcError::invalid_params("Missing params"));
1129        };
1130
1131        let resources = self.resources.read().await;
1132        let handler = resources
1133            .get(&params.uri)
1134            .ok_or_else(|| JsonRpcError::method_not_found(&params.uri))?;
1135
1136        match handler(params.uri.clone()) {
1137            Ok(result) => serde_json::to_value(result)
1138                .map_err(|e| JsonRpcError::internal_error(e.to_string())),
1139            Err(e) => Err(JsonRpcError::internal_error(e.to_string())),
1140        }
1141    }
1142
1143    /// Handle list prompts request
1144    async fn handle_list_prompts(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1145        let result = ListPromptsResult {
1146            prompts: vec![
1147                McpPrompt {
1148                    name: "code_review".to_string(),
1149                    description: Some("Review code for issues and improvements".to_string()),
1150                    arguments: vec![PromptArgument {
1151                        name: "file".to_string(),
1152                        description: Some("File to review".to_string()),
1153                        required: true,
1154                    }],
1155                },
1156                McpPrompt {
1157                    name: "explain_code".to_string(),
1158                    description: Some("Explain what code does".to_string()),
1159                    arguments: vec![PromptArgument {
1160                        name: "file".to_string(),
1161                        description: Some("File to explain".to_string()),
1162                        required: true,
1163                    }],
1164                },
1165            ],
1166            next_cursor: None,
1167        };
1168
1169        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1170    }
1171
1172    /// Handle get prompt request
1173    async fn handle_get_prompt(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1174        let params: GetPromptParams = if let Some(p) = params {
1175            serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1176        } else {
1177            return Err(JsonRpcError::invalid_params("Missing params"));
1178        };
1179
1180        let result = match params.name.as_str() {
1181            "code_review" => {
1182                let file = params
1183                    .arguments
1184                    .get("file")
1185                    .and_then(|v| v.as_str())
1186                    .unwrap_or("file.rs");
1187
1188                GetPromptResult {
1189                    description: Some("Code review prompt".to_string()),
1190                    messages: vec![PromptMessage {
1191                        role: PromptRole::User,
1192                        content: PromptContent::Text {
1193                            text: format!(
1194                                "Please review the following code for:\n\
1195                                     - Bugs and potential issues\n\
1196                                     - Performance concerns\n\
1197                                     - Code style and best practices\n\
1198                                     - Security vulnerabilities\n\n\
1199                                     File: {}",
1200                                file
1201                            ),
1202                        },
1203                    }],
1204                }
1205            }
1206            "explain_code" => {
1207                let file = params
1208                    .arguments
1209                    .get("file")
1210                    .and_then(|v| v.as_str())
1211                    .unwrap_or("file.rs");
1212
1213                GetPromptResult {
1214                    description: Some("Code explanation prompt".to_string()),
1215                    messages: vec![PromptMessage {
1216                        role: PromptRole::User,
1217                        content: PromptContent::Text {
1218                            text: format!(
1219                                "Please explain what this code does, including:\n\
1220                                     - Overall purpose\n\
1221                                     - Key functions and their roles\n\
1222                                     - Data flow\n\
1223                                     - Important algorithms used\n\n\
1224                                     File: {}",
1225                                file
1226                            ),
1227                        },
1228                    }],
1229                }
1230            }
1231            _ => return Err(JsonRpcError::method_not_found(&params.name)),
1232        };
1233
1234        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1235    }
1236}