Skip to main content

codetether_agent/mcp/
server.rs

1//! MCP Server - Exposes CodeTether tools to MCP clients
2//!
3//! Runs as a stdio-based MCP server that can be connected to by:
4//! - Claude Desktop
5//! - Other MCP clients
6//!
7//! Exposed tools include:
8//! - run_command: Execute shell commands
9//! - read_file: Read file contents
10//! - write_file: Write file contents
11//! - search_files: Search for files
12//! - swarm: Execute tasks with parallel sub-agents
13//! - rlm: Analyze large content
14//! - ralph: Autonomous PRD-driven execution
15
16use super::bus_bridge::BusBridge;
17use super::transport::{McpMessage, StdioTransport, Transport};
18use super::types::*;
19use crate::bus::{AgentBus, BusMessage};
20use crate::tool::ToolRegistry;
21use anyhow::Result;
22use serde_json::{Value, json};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26use tracing::{debug, info, warn};
27
28/// MCP Server implementation
29pub struct McpServer {
30    transport: Arc<dyn Transport>,
31    tools: RwLock<HashMap<String, McpToolHandler>>,
32    resources: RwLock<HashMap<String, McpResourceHandler>>,
33    /// Prompt handlers for MCP prompts (reserved for future use)
34    #[allow(dead_code)]
35    prompts: RwLock<HashMap<String, McpPromptHandler>>,
36    initialized: RwLock<bool>,
37    server_info: ServerInfo,
38    /// Tool metadata storage for querying tool information
39    metadata: RwLock<HashMap<String, ToolMetadata>>,
40    /// Resource metadata storage for querying resource information
41    resource_metadata: RwLock<HashMap<String, ResourceMetadata>>,
42    /// Optional bus bridge for live event monitoring
43    bus: Option<Arc<BusBridge>>,
44    /// Optional local agent bus for publishing tool calls to S3 sink
45    agent_bus: Option<Arc<AgentBus>>,
46    /// Optional tool registry bridging all CodeTether tools to MCP
47    tool_registry: Option<Arc<ToolRegistry>>,
48}
49
50type McpToolHandler = Arc<dyn Fn(Value) -> Result<CallToolResult> + Send + Sync>;
51type McpResourceHandler = Arc<dyn Fn(String) -> Result<ReadResourceResult> + Send + Sync>;
52type McpPromptHandler = Arc<dyn Fn(Value) -> Result<GetPromptResult> + Send + Sync>;
53
54impl McpServer {
55    /// Create a new MCP server over stdio
56    pub fn new_stdio() -> Self {
57        // Use Arc's unsized coercion to convert Arc<StdioTransport> -> Arc<dyn Transport>
58        let transport: Arc<dyn Transport> = Arc::new(StdioTransport::new());
59        Self::new(transport)
60    }
61
62    /// Create a new MCP server for in-process/local usage.
63    ///
64    /// Unlike [`Self::new_stdio`], this does not spawn any stdio reader/writer threads
65    /// and will not lock stdout. This is intended for CLI flows that need to query
66    /// tool metadata or invoke tools directly without running a long-lived stdio server.
67    pub fn new_local() -> Self {
68        let transport: Arc<dyn Transport> = Arc::new(super::transport::NullTransport::new());
69        Self::new(transport)
70    }
71
72    /// Create a new MCP server with custom transport
73    pub fn new(transport: Arc<dyn Transport>) -> Self {
74        let mut server = Self {
75            transport,
76            tools: RwLock::new(HashMap::new()),
77            resources: RwLock::new(HashMap::new()),
78            prompts: RwLock::new(HashMap::new()),
79            initialized: RwLock::new(false),
80            server_info: ServerInfo {
81                name: "codetether".to_string(),
82                version: env!("CARGO_PKG_VERSION").to_string(),
83            },
84            metadata: RwLock::new(HashMap::new()),
85            resource_metadata: RwLock::new(HashMap::new()),
86            bus: None,
87            agent_bus: None,
88            tool_registry: None,
89        };
90
91        // Register default tools
92        server.register_default_tools();
93
94        server
95    }
96
97    /// Attach the full CodeTether tool registry to the MCP server.
98    ///
99    /// All tools from the registry will be exposed as MCP tools, replacing
100    /// the hardcoded basic tool set. Call before [`Self::run`].
101    pub fn with_tool_registry(mut self, registry: Arc<ToolRegistry>) -> Self {
102        self.tool_registry = Some(registry);
103        self
104    }
105
106    /// Attach a local agent bus for publishing tool calls to the S3 sink.
107    ///
108    /// Call this *before* [`Self::run`] so every tool invocation gets
109    /// recorded as a `ToolRequest` + `ToolResponse` on the bus.
110    pub fn with_agent_bus(mut self, bus: Arc<AgentBus>) -> Self {
111        self.agent_bus = Some(bus);
112        self
113    }
114
115    /// Attach a bus bridge and register bus-aware tools + resources.
116    ///
117    /// Call this *before* [`Self::run`] to enable live bus monitoring.
118    pub async fn with_bus(mut self, bus_url: String) -> Self {
119        let bridge = BusBridge::new(bus_url).spawn();
120        self.bus = Some(Arc::clone(&bridge));
121        self.register_bus_tools(Arc::clone(&bridge)).await;
122        self.register_bus_resources(Arc::clone(&bridge)).await;
123        self
124    }
125
126    /// Register bus-specific MCP tools.
127    async fn register_bus_tools(&self, bridge: Arc<BusBridge>) {
128        // ── bus_events ──────────────────────────────────────────────
129        let b = Arc::clone(&bridge);
130        self.register_tool(
131            "bus_events",
132            "Query recent events from the agent bus. Returns BusEnvelope JSON objects \
133             matching the optional topic filter (supports wildcards like 'ralph.*').",
134            json!({
135                "type": "object",
136                "properties": {
137                    "topic_filter": {
138                        "type": "string",
139                        "description": "Topic pattern to filter (e.g. 'ralph.*', 'agent.*', '*'). Default: all."
140                    },
141                    "limit": {
142                        "type": "integer",
143                        "description": "Max events to return (default: 50, max: 500)"
144                    }
145                }
146            }),
147            Arc::new(move |args| {
148                let topic_filter = args.get("topic_filter").and_then(|v| v.as_str()).map(String::from);
149                let limit = args
150                    .get("limit")
151                    .and_then(|v| v.as_u64())
152                    .unwrap_or(50)
153                    .min(500) as usize;
154
155                let b = Arc::clone(&b);
156                let events = tokio::task::block_in_place(|| {
157                    tokio::runtime::Handle::current().block_on(async {
158                        b.recent_events(topic_filter.as_deref(), limit, None).await
159                    })
160                });
161
162                let text = serde_json::to_string_pretty(&events)
163                    .unwrap_or_else(|_| "[]".to_string());
164
165                Ok(CallToolResult {
166                    content: vec![ToolContent::Text { text }],
167                    is_error: false,
168                })
169            }),
170        )
171        .await;
172
173        // ── bus_status ──────────────────────────────────────────────
174        let b = Arc::clone(&bridge);
175        self.register_tool(
176            "bus_status",
177            "Get the current status of the bus bridge: connection state, event count, \
178             and buffer usage.",
179            json!({
180                "type": "object",
181                "properties": {}
182            }),
183            Arc::new(move |_args| {
184                let status = b.status();
185                let buffer_len = tokio::task::block_in_place(|| {
186                    tokio::runtime::Handle::current().block_on(b.buffer_len())
187                });
188
189                let text = serde_json::to_string_pretty(&json!({
190                    "connected": status.connected,
191                    "total_received": status.total_received,
192                    "buffer_used": buffer_len,
193                    "buffer_capacity": status.buffer_capacity,
194                    "bus_url": status.bus_url,
195                }))
196                .unwrap_or_default();
197
198                Ok(CallToolResult {
199                    content: vec![ToolContent::Text { text }],
200                    is_error: false,
201                })
202            }),
203        )
204        .await;
205
206        // ── ralph_status ────────────────────────────────────────────
207        self.register_tool(
208            "ralph_status",
209            "Get current Ralph PRD status: story pass/fail states, iteration count, \
210             and progress.txt content. Reads prd.json and progress.txt from the \
211             current working directory.",
212            json!({
213                "type": "object",
214                "properties": {
215                    "prd_path": {
216                        "type": "string",
217                        "description": "Path to prd.json (default: ./prd.json)"
218                    }
219                }
220            }),
221            Arc::new(|args| {
222                let prd_path = args
223                    .get("prd_path")
224                    .and_then(|v| v.as_str())
225                    .unwrap_or("prd.json");
226
227                let mut result = json!({});
228
229                // Read PRD
230                if let Ok(content) = std::fs::read_to_string(prd_path) {
231                    if let Ok(prd) = serde_json::from_str::<serde_json::Value>(&content) {
232                        let stories = prd.get("user_stories").and_then(|s| s.as_array());
233                        let passed = stories
234                            .map(|arr| {
235                                arr.iter()
236                                    .filter(|s| {
237                                        s.get("passes").and_then(|v| v.as_bool()).unwrap_or(false)
238                                    })
239                                    .count()
240                            })
241                            .unwrap_or(0);
242                        let total = stories.map(|arr| arr.len()).unwrap_or(0);
243
244                        result["prd"] = prd;
245                        result["summary"] = json!({
246                            "passed": passed,
247                            "total": total,
248                            "progress_pct": if total > 0 { (passed * 100) / total } else { 0 },
249                        });
250                    }
251                } else {
252                    result["prd_error"] = json!("prd.json not found");
253                }
254
255                // Read progress.txt
256                let progress_path = std::path::Path::new(prd_path)
257                    .parent()
258                    .unwrap_or(std::path::Path::new("."))
259                    .join("progress.txt");
260                if let Ok(progress) = std::fs::read_to_string(&progress_path) {
261                    result["progress"] = json!(progress);
262                }
263
264                let text = serde_json::to_string_pretty(&result).unwrap_or_default();
265
266                Ok(CallToolResult {
267                    content: vec![ToolContent::Text { text }],
268                    is_error: false,
269                })
270            }),
271        )
272        .await;
273
274        info!("Registered bus-aware MCP tools: bus_events, bus_status, ralph_status");
275    }
276
277    /// Register bus-specific MCP resources.
278    async fn register_bus_resources(&self, bridge: Arc<BusBridge>) {
279        // ── codetether://bus/events/recent ───────────────────────────
280        let b = Arc::clone(&bridge);
281        self.register_resource(
282            "codetether://bus/events/recent",
283            "Recent Bus Events",
284            "Last 100 events from the agent bus (JSON array of BusEnvelope)",
285            Some("application/json"),
286            Arc::new(move |_uri| {
287                let events = tokio::task::block_in_place(|| {
288                    tokio::runtime::Handle::current()
289                        .block_on(async { b.recent_events(None, 100, None).await })
290                });
291                let text =
292                    serde_json::to_string_pretty(&events).unwrap_or_else(|_| "[]".to_string());
293                Ok(ReadResourceResult {
294                    contents: vec![ResourceContents {
295                        uri: "codetether://bus/events/recent".to_string(),
296                        mime_type: Some("application/json".to_string()),
297                        content: ResourceContent::Text { text },
298                    }],
299                })
300            }),
301        )
302        .await;
303
304        // ── codetether://ralph/prd ──────────────────────────────────
305        self.register_resource(
306            "codetether://ralph/prd",
307            "Ralph PRD",
308            "Current PRD JSON with story pass/fail status",
309            Some("application/json"),
310            Arc::new(|_uri| {
311                let text = std::fs::read_to_string("prd.json")
312                    .unwrap_or_else(|_| r#"{"error": "prd.json not found"}"#.to_string());
313                Ok(ReadResourceResult {
314                    contents: vec![ResourceContents {
315                        uri: "codetether://ralph/prd".to_string(),
316                        mime_type: Some("application/json".to_string()),
317                        content: ResourceContent::Text { text },
318                    }],
319                })
320            }),
321        )
322        .await;
323
324        // ── codetether://ralph/progress ─────────────────────────────
325        self.register_resource(
326            "codetether://ralph/progress",
327            "Ralph Progress",
328            "progress.txt content from the current Ralph run",
329            Some("text/plain"),
330            Arc::new(|_uri| {
331                let text = std::fs::read_to_string("progress.txt")
332                    .unwrap_or_else(|_| "(no progress.txt found)".to_string());
333                Ok(ReadResourceResult {
334                    contents: vec![ResourceContents {
335                        uri: "codetether://ralph/progress".to_string(),
336                        mime_type: Some("text/plain".to_string()),
337                        content: ResourceContent::Text { text },
338                    }],
339                })
340            }),
341        )
342        .await;
343
344        info!("Registered bus-aware MCP resources");
345    }
346
347    /// Register default CodeTether tools
348    fn register_default_tools(&mut self) {
349        // These will be registered synchronously in the constructor
350        // The actual tool handlers will be added in run()
351    }
352
353    /// Register a tool
354    pub async fn register_tool(
355        &self,
356        name: &str,
357        description: &str,
358        input_schema: Value,
359        handler: McpToolHandler,
360    ) {
361        // Store tool metadata
362        let metadata = ToolMetadata::new(
363            name.to_string(),
364            Some(description.to_string()),
365            input_schema.clone(),
366        );
367
368        let mut metadata_map = self.metadata.write().await;
369        metadata_map.insert(name.to_string(), metadata);
370        drop(metadata_map);
371
372        let mut tools = self.tools.write().await;
373        tools.insert(name.to_string(), handler);
374
375        debug!("Registered MCP tool: {}", name);
376    }
377
378    /// Register a resource
379    pub async fn register_resource(
380        &self,
381        uri: &str,
382        name: &str,
383        description: &str,
384        mime_type: Option<&str>,
385        handler: McpResourceHandler,
386    ) {
387        // Store resource metadata
388        let metadata = ResourceMetadata::new(
389            uri.to_string(),
390            name.to_string(),
391            Some(description.to_string()),
392            mime_type.map(|s| s.to_string()),
393        );
394
395        let mut metadata_map = self.resource_metadata.write().await;
396        metadata_map.insert(uri.to_string(), metadata);
397        drop(metadata_map);
398
399        let mut resources = self.resources.write().await;
400        resources.insert(uri.to_string(), handler);
401
402        debug!("Registered MCP resource: {}", uri);
403    }
404
405    /// Get tool metadata by name
406    pub async fn get_tool_metadata(&self, name: &str) -> Option<ToolMetadata> {
407        let metadata = self.metadata.read().await;
408        metadata.get(name).cloned()
409    }
410
411    /// Get all tool metadata
412    pub async fn get_all_tool_metadata(&self) -> Vec<ToolMetadata> {
413        let metadata = self.metadata.read().await;
414        metadata.values().cloned().collect()
415    }
416
417    /// Get resource metadata by URI
418    pub async fn get_resource_metadata(&self, uri: &str) -> Option<ResourceMetadata> {
419        let metadata = self.resource_metadata.read().await;
420        metadata.get(uri).cloned()
421    }
422
423    /// Get all resource metadata
424    pub async fn get_all_resource_metadata(&self) -> Vec<ResourceMetadata> {
425        let metadata = self.resource_metadata.read().await;
426        metadata.values().cloned().collect()
427    }
428
429    /// Register a prompt handler
430    pub async fn register_prompt(&self, name: &str, handler: McpPromptHandler) {
431        let mut prompts = self.prompts.write().await;
432        prompts.insert(name.to_string(), handler);
433        debug!("Registered MCP prompt: {}", name);
434    }
435
436    /// Get a prompt handler by name
437    pub async fn get_prompt_handler(&self, name: &str) -> Option<McpPromptHandler> {
438        let prompts = self.prompts.read().await;
439        prompts.get(name).cloned()
440    }
441
442    /// List all registered prompt names
443    pub async fn list_prompts(&self) -> Vec<String> {
444        let prompts = self.prompts.read().await;
445        prompts.keys().cloned().collect()
446    }
447
448    /// Run the MCP server (main loop)
449    pub async fn run(&self) -> Result<()> {
450        info!("Starting MCP server...");
451
452        // Register tools before starting
453        self.setup_tools().await;
454
455        loop {
456            match self.transport.receive().await? {
457                Some(McpMessage::Request(request)) => {
458                    let response = self.handle_request(request).await;
459                    self.transport.send_response(response).await?;
460                }
461                Some(McpMessage::Notification(notification)) => {
462                    self.handle_notification(notification).await;
463                }
464                Some(McpMessage::Response(response)) => {
465                    // We received a response (shouldn't happen in server mode)
466                    warn!("Unexpected response received: {:?}", response.id);
467                }
468                None => {
469                    info!("Transport closed, shutting down MCP server");
470                    break;
471                }
472            }
473        }
474
475        Ok(())
476    }
477
478    /// Setup default tools (public, for CLI use)
479    pub async fn setup_tools_public(&self) {
480        self.setup_tools().await;
481    }
482
483    /// Call a tool directly without going through the transport
484    pub async fn call_tool_direct(&self, name: &str, arguments: Value) -> Result<CallToolResult> {
485        let tools = self.tools.read().await;
486        let handler = tools
487            .get(name)
488            .ok_or_else(|| anyhow::anyhow!("Tool not found: {}", name))?
489            .clone();
490        drop(tools);
491        handler(arguments)
492    }
493
494    /// Setup default tools
495    async fn setup_tools(&self) {
496        // If a ToolRegistry was provided, bridge ALL its tools into MCP
497        if let Some(registry) = &self.tool_registry {
498            self.register_registry_tools(registry).await;
499            info!(
500                "Registered {} MCP tools from ToolRegistry",
501                self.tools.read().await.len()
502            );
503            return;
504        }
505
506        // Fallback: register basic hardcoded tools when no registry is available
507        self.register_fallback_tools().await;
508        info!(
509            "Registered {} MCP tools (fallback)",
510            self.tools.read().await.len()
511        );
512    }
513
514    /// Bridge all tools from a ToolRegistry into MCP tool handlers.
515    async fn register_registry_tools(&self, registry: &Arc<ToolRegistry>) {
516        // Skip tools that don't make sense over MCP:
517        // - question: interactive TUI-only prompt
518        // - invalid: internal error handler
519        // - batch: needs weak registry reference, internal orchestration
520        // - confirm_edit / confirm_multiedit: dead TUI confirmation tools
521        // - plan_enter / plan_exit: session-state dependent TUI tools
522        // - voice / podcast / youtube / avatar / image: media generation tools
523        // - undo: git undo, dangerous without TUI context
524        let skip_tools = [
525            "question",
526            "invalid",
527            "batch",
528            "confirm_edit",
529            "confirm_multiedit",
530            "plan_enter",
531            "plan_exit",
532            "voice",
533            "podcast",
534            "youtube",
535            "avatar",
536            "image",
537            "undo",
538        ];
539
540        for tool_id in registry.list() {
541            if skip_tools.contains(&tool_id) {
542                continue;
543            }
544
545            let tool = match registry.get(tool_id) {
546                Some(t) => t,
547                None => continue,
548            };
549
550            let tool_clone = Arc::clone(&tool);
551
552            self.register_tool(
553                tool.id(),
554                tool.description(),
555                tool.parameters(),
556                Arc::new(move |args: Value| {
557                    let tool = Arc::clone(&tool_clone);
558                    let result = tokio::task::block_in_place(|| {
559                        tokio::runtime::Handle::current()
560                            .block_on(async { tool.execute(args).await })
561                    });
562
563                    match result {
564                        Ok(tool_result) => Ok(CallToolResult {
565                            content: vec![ToolContent::Text {
566                                text: tool_result.output,
567                            }],
568                            is_error: !tool_result.success,
569                        }),
570                        Err(e) => Ok(CallToolResult {
571                            content: vec![ToolContent::Text {
572                                text: e.to_string(),
573                            }],
574                            is_error: true,
575                        }),
576                    }
577                }),
578            )
579            .await;
580        }
581    }
582
583    /// Fallback hardcoded tools for when no ToolRegistry is provided.
584    async fn register_fallback_tools(&self) {
585        // run_command tool
586        self.register_tool(
587            "run_command",
588            "Execute a shell command and return the output",
589            json!({
590                "type": "object",
591                "properties": {
592                    "command": {
593                        "type": "string",
594                        "description": "The command to execute"
595                    },
596                    "cwd": {
597                        "type": "string",
598                        "description": "Working directory (optional)"
599                    },
600                    "timeout_ms": {
601                        "type": "integer",
602                        "description": "Timeout in milliseconds (default: 30000)"
603                    }
604                },
605                "required": ["command"]
606            }),
607            Arc::new(|args| {
608                let command = args
609                    .get("command")
610                    .and_then(|v| v.as_str())
611                    .ok_or_else(|| anyhow::anyhow!("Missing command"))?;
612
613                let cwd = args.get("cwd").and_then(|v| v.as_str());
614
615                let mut cmd = std::process::Command::new("/bin/sh");
616                cmd.arg("-c").arg(command);
617
618                if let Some(dir) = cwd {
619                    cmd.current_dir(dir);
620                }
621
622                let output = cmd.output()?;
623                let stdout = String::from_utf8_lossy(&output.stdout);
624                let stderr = String::from_utf8_lossy(&output.stderr);
625
626                let result = if output.status.success() {
627                    format!("{}{}", stdout, stderr)
628                } else {
629                    format!(
630                        "Exit code: {}\n{}{}",
631                        output.status.code().unwrap_or(-1),
632                        stdout,
633                        stderr
634                    )
635                };
636
637                Ok(CallToolResult {
638                    content: vec![ToolContent::Text { text: result }],
639                    is_error: !output.status.success(),
640                })
641            }),
642        )
643        .await;
644
645        // read_file tool
646        self.register_tool(
647            "read_file",
648            "Read the contents of a file",
649            json!({
650                "type": "object",
651                "properties": {
652                    "path": {
653                        "type": "string",
654                        "description": "Path to the file to read"
655                    },
656                    "offset": {
657                        "type": "integer",
658                        "description": "Line offset to start reading from (1-indexed)"
659                    },
660                    "limit": {
661                        "type": "integer",
662                        "description": "Maximum number of lines to read"
663                    }
664                },
665                "required": ["path"]
666            }),
667            Arc::new(|args| {
668                let path = args
669                    .get("path")
670                    .and_then(|v| v.as_str())
671                    .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
672
673                let content = std::fs::read_to_string(path)?;
674
675                let offset = args.get("offset").and_then(|v| v.as_u64()).unwrap_or(1) as usize;
676                let limit = args.get("limit").and_then(|v| v.as_u64());
677
678                let lines: Vec<&str> = content.lines().collect();
679                let start = (offset.saturating_sub(1)).min(lines.len());
680                let end = if let Some(l) = limit {
681                    (start + l as usize).min(lines.len())
682                } else {
683                    lines.len()
684                };
685
686                let result = lines[start..end].join("\n");
687
688                Ok(CallToolResult {
689                    content: vec![ToolContent::Text { text: result }],
690                    is_error: false,
691                })
692            }),
693        )
694        .await;
695
696        // write_file tool
697        self.register_tool(
698            "write_file",
699            "Write content to a file",
700            json!({
701                "type": "object",
702                "properties": {
703                    "path": {
704                        "type": "string",
705                        "description": "Path to the file to write"
706                    },
707                    "content": {
708                        "type": "string",
709                        "description": "Content to write"
710                    },
711                    "create_dirs": {
712                        "type": "boolean",
713                        "description": "Create parent directories if they don't exist"
714                    }
715                },
716                "required": ["path", "content"]
717            }),
718            Arc::new(|args| {
719                let path = args
720                    .get("path")
721                    .and_then(|v| v.as_str())
722                    .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
723
724                let content = args
725                    .get("content")
726                    .and_then(|v| v.as_str())
727                    .ok_or_else(|| anyhow::anyhow!("Missing content"))?;
728
729                let create_dirs = args
730                    .get("create_dirs")
731                    .and_then(|v| v.as_bool())
732                    .unwrap_or(false);
733
734                if create_dirs {
735                    if let Some(parent) = std::path::Path::new(path).parent() {
736                        std::fs::create_dir_all(parent)?;
737                    }
738                }
739
740                std::fs::write(path, content)?;
741
742                Ok(CallToolResult {
743                    content: vec![ToolContent::Text {
744                        text: format!("Wrote {} bytes to {}", content.len(), path),
745                    }],
746                    is_error: false,
747                })
748            }),
749        )
750        .await;
751
752        // list_directory tool
753        self.register_tool(
754            "list_directory",
755            "List contents of a directory",
756            json!({
757                "type": "object",
758                "properties": {
759                    "path": {
760                        "type": "string",
761                        "description": "Path to the directory"
762                    },
763                    "recursive": {
764                        "type": "boolean",
765                        "description": "List recursively"
766                    },
767                    "max_depth": {
768                        "type": "integer",
769                        "description": "Maximum depth for recursive listing"
770                    }
771                },
772                "required": ["path"]
773            }),
774            Arc::new(|args| {
775                let path = args
776                    .get("path")
777                    .and_then(|v| v.as_str())
778                    .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
779
780                let mut entries = Vec::new();
781                for entry in std::fs::read_dir(path)? {
782                    let entry = entry?;
783                    let file_type = entry.file_type()?;
784                    let name = entry.file_name().to_string_lossy().to_string();
785                    let suffix = if file_type.is_dir() { "/" } else { "" };
786                    entries.push(format!("{}{}", name, suffix));
787                }
788
789                entries.sort();
790
791                Ok(CallToolResult {
792                    content: vec![ToolContent::Text {
793                        text: entries.join("\n"),
794                    }],
795                    is_error: false,
796                })
797            }),
798        )
799        .await;
800
801        // search_files tool
802        self.register_tool(
803            "search_files",
804            "Search for files matching a pattern",
805            json!({
806                "type": "object",
807                "properties": {
808                    "pattern": {
809                        "type": "string",
810                        "description": "Search pattern (glob or regex)"
811                    },
812                    "path": {
813                        "type": "string",
814                        "description": "Directory to search in"
815                    },
816                    "content_pattern": {
817                        "type": "string",
818                        "description": "Pattern to search in file contents"
819                    }
820                },
821                "required": ["pattern"]
822            }),
823            Arc::new(|args| {
824                let pattern = args
825                    .get("pattern")
826                    .and_then(|v| v.as_str())
827                    .ok_or_else(|| anyhow::anyhow!("Missing pattern"))?;
828
829                let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
830
831                // Simple glob using find command
832                let output = std::process::Command::new("find")
833                    .args([path, "-name", pattern, "-type", "f"])
834                    .output()?;
835
836                let result = String::from_utf8_lossy(&output.stdout);
837
838                Ok(CallToolResult {
839                    content: vec![ToolContent::Text {
840                        text: result.to_string(),
841                    }],
842                    is_error: !output.status.success(),
843                })
844            }),
845        )
846        .await;
847
848        // grep_search tool
849        self.register_tool(
850            "grep_search",
851            "Search file contents using grep",
852            json!({
853                "type": "object",
854                "properties": {
855                    "query": {
856                        "type": "string",
857                        "description": "Search pattern"
858                    },
859                    "path": {
860                        "type": "string",
861                        "description": "Directory or file to search"
862                    },
863                    "is_regex": {
864                        "type": "boolean",
865                        "description": "Treat pattern as regex"
866                    },
867                    "case_sensitive": {
868                        "type": "boolean",
869                        "description": "Case-sensitive search"
870                    }
871                },
872                "required": ["query"]
873            }),
874            Arc::new(|args| {
875                let query = args
876                    .get("query")
877                    .and_then(|v| v.as_str())
878                    .ok_or_else(|| anyhow::anyhow!("Missing query"))?;
879
880                let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
881
882                let is_regex = args
883                    .get("is_regex")
884                    .and_then(|v| v.as_bool())
885                    .unwrap_or(false);
886
887                let case_sensitive = args
888                    .get("case_sensitive")
889                    .and_then(|v| v.as_bool())
890                    .unwrap_or(false);
891
892                let mut cmd = std::process::Command::new("grep");
893                cmd.arg("-r").arg("-n");
894
895                if !case_sensitive {
896                    cmd.arg("-i");
897                }
898
899                if is_regex {
900                    cmd.arg("-E");
901                } else {
902                    cmd.arg("-F");
903                }
904
905                cmd.arg(query).arg(path);
906
907                let output = cmd.output()?;
908                let result = String::from_utf8_lossy(&output.stdout);
909
910                Ok(CallToolResult {
911                    content: vec![ToolContent::Text {
912                        text: result.to_string(),
913                    }],
914                    is_error: false,
915                })
916            }),
917        )
918        .await;
919
920        info!("Registered {} MCP tools", self.tools.read().await.len());
921    }
922
923    /// Handle a JSON-RPC request
924    async fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse {
925        debug!(
926            "Handling request: {} (id: {:?})",
927            request.method, request.id
928        );
929
930        let result = match request.method.as_str() {
931            "initialize" => self.handle_initialize(request.params).await,
932            "initialized" => Ok(json!({})),
933            "ping" => Ok(json!({})),
934            "tools/list" => self.handle_list_tools(request.params).await,
935            "tools/call" => self.handle_call_tool(request.params).await,
936            "resources/list" => self.handle_list_resources(request.params).await,
937            "resources/read" => self.handle_read_resource(request.params).await,
938            "prompts/list" => self.handle_list_prompts(request.params).await,
939            "prompts/get" => self.handle_get_prompt(request.params).await,
940            _ => Err(JsonRpcError::method_not_found(&request.method)),
941        };
942
943        match result {
944            Ok(value) => JsonRpcResponse::success(request.id, value),
945            Err(error) => JsonRpcResponse::error(request.id, error),
946        }
947    }
948
949    /// Handle a notification
950    async fn handle_notification(&self, notification: JsonRpcNotification) {
951        debug!("Handling notification: {}", notification.method);
952
953        match notification.method.as_str() {
954            "notifications/initialized" => {
955                *self.initialized.write().await = true;
956                info!("MCP client initialized");
957            }
958            "notifications/cancelled" => {
959                // Handle cancellation
960            }
961            _ => {
962                debug!("Unknown notification: {}", notification.method);
963            }
964        }
965    }
966
967    /// Handle initialize request
968    async fn handle_initialize(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
969        let _params: InitializeParams = if let Some(p) = params {
970            serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
971        } else {
972            return Err(JsonRpcError::invalid_params("Missing params"));
973        };
974
975        let result = InitializeResult {
976            protocol_version: PROTOCOL_VERSION.to_string(),
977            capabilities: ServerCapabilities {
978                tools: Some(ToolsCapability { list_changed: true }),
979                resources: Some(ResourcesCapability {
980                    subscribe: false,
981                    list_changed: true,
982                }),
983                prompts: Some(PromptsCapability { list_changed: true }),
984                logging: Some(LoggingCapability {}),
985                experimental: None,
986            },
987            server_info: self.server_info.clone(),
988            instructions: Some(
989                "CodeTether is an AI coding agent with tools for file operations, \
990                 command execution, code search, and autonomous task execution. \
991                 Use the swarm tool for complex tasks requiring parallel execution, \
992                 and ralph for PRD-driven development."
993                    .to_string(),
994            ),
995        };
996
997        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
998    }
999
1000    /// Handle list tools request - reads from ToolMetadata registry
1001    async fn handle_list_tools(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1002        // Read tools from the metadata registry instead of hardcoded list
1003        let metadata = self.metadata.read().await;
1004
1005        let tool_list: Vec<McpTool> = metadata
1006            .values()
1007            .map(|tm| McpTool {
1008                name: tm.name.clone(),
1009                description: tm.description.clone(),
1010                input_schema: tm.input_schema.clone(),
1011            })
1012            .collect();
1013
1014        let result = ListToolsResult {
1015            tools: tool_list,
1016            next_cursor: None,
1017        };
1018
1019        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1020    }
1021
1022    /// Handle call tool request
1023    async fn handle_call_tool(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1024        let params: CallToolParams = if let Some(p) = params {
1025            serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1026        } else {
1027            return Err(JsonRpcError::invalid_params("Missing params"));
1028        };
1029
1030        // Publish ToolRequest to the agent bus (picked up by S3 sink)
1031        let request_id = uuid::Uuid::new_v4().to_string();
1032        let bus_handle = self.agent_bus.as_ref().map(|bus| bus.handle("mcp-server"));
1033        if let Some(ref bh) = bus_handle {
1034            bh.send(
1035                format!("tools.{}", &params.name),
1036                BusMessage::ToolRequest {
1037                    request_id: request_id.clone(),
1038                    agent_id: "mcp-server".into(),
1039                    tool_name: params.name.clone(),
1040                    arguments: params.arguments.clone(),
1041                },
1042            );
1043        }
1044
1045        let tools = self.tools.read().await;
1046        let handler = tools
1047            .get(&params.name)
1048            .ok_or_else(|| JsonRpcError::method_not_found(&params.name))?;
1049
1050        let (result_value, output_text, success) = match handler(params.arguments) {
1051            Ok(result) => {
1052                let text = result
1053                    .content
1054                    .iter()
1055                    .filter_map(|c| match c {
1056                        ToolContent::Text { text } => Some(text.as_str()),
1057                        _ => None,
1058                    })
1059                    .collect::<Vec<_>>()
1060                    .join("\n");
1061                let is_err = result.is_error;
1062                let val = serde_json::to_value(result)
1063                    .map_err(|e| JsonRpcError::internal_error(e.to_string()))?;
1064                (val, text, !is_err)
1065            }
1066            Err(e) => {
1067                let err_text = e.to_string();
1068                let result = CallToolResult {
1069                    content: vec![ToolContent::Text {
1070                        text: err_text.clone(),
1071                    }],
1072                    is_error: true,
1073                };
1074                let val = serde_json::to_value(result)
1075                    .map_err(|e| JsonRpcError::internal_error(e.to_string()))?;
1076                (val, err_text, false)
1077            }
1078        };
1079
1080        // Publish ToolResponse to the agent bus
1081        if let Some(ref bh) = bus_handle {
1082            bh.send(
1083                format!("tools.{}", &params.name),
1084                BusMessage::ToolResponse {
1085                    request_id,
1086                    agent_id: "mcp-server".into(),
1087                    tool_name: params.name,
1088                    result: output_text,
1089                    success,
1090                },
1091            );
1092        }
1093
1094        Ok(result_value)
1095    }
1096
1097    /// Handle list resources request — reads from registered resource metadata
1098    async fn handle_list_resources(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1099        let metadata = self.resource_metadata.read().await;
1100        let resource_list: Vec<McpResource> = metadata
1101            .values()
1102            .map(|rm| McpResource {
1103                uri: rm.uri.clone(),
1104                name: rm.name.clone(),
1105                description: rm.description.clone(),
1106                mime_type: rm.mime_type.clone(),
1107            })
1108            .collect();
1109
1110        let result = ListResourcesResult {
1111            resources: resource_list,
1112            next_cursor: None,
1113        };
1114
1115        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1116    }
1117
1118    /// Handle read resource request
1119    async fn handle_read_resource(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1120        let params: ReadResourceParams = if let Some(p) = params {
1121            serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1122        } else {
1123            return Err(JsonRpcError::invalid_params("Missing params"));
1124        };
1125
1126        let resources = self.resources.read().await;
1127        let handler = resources
1128            .get(&params.uri)
1129            .ok_or_else(|| JsonRpcError::method_not_found(&params.uri))?;
1130
1131        match handler(params.uri.clone()) {
1132            Ok(result) => serde_json::to_value(result)
1133                .map_err(|e| JsonRpcError::internal_error(e.to_string())),
1134            Err(e) => Err(JsonRpcError::internal_error(e.to_string())),
1135        }
1136    }
1137
1138    /// Handle list prompts request
1139    async fn handle_list_prompts(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1140        let result = ListPromptsResult {
1141            prompts: vec![
1142                McpPrompt {
1143                    name: "code_review".to_string(),
1144                    description: Some("Review code for issues and improvements".to_string()),
1145                    arguments: vec![PromptArgument {
1146                        name: "file".to_string(),
1147                        description: Some("File to review".to_string()),
1148                        required: true,
1149                    }],
1150                },
1151                McpPrompt {
1152                    name: "explain_code".to_string(),
1153                    description: Some("Explain what code does".to_string()),
1154                    arguments: vec![PromptArgument {
1155                        name: "file".to_string(),
1156                        description: Some("File to explain".to_string()),
1157                        required: true,
1158                    }],
1159                },
1160            ],
1161            next_cursor: None,
1162        };
1163
1164        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1165    }
1166
1167    /// Handle get prompt request
1168    async fn handle_get_prompt(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1169        let params: GetPromptParams = if let Some(p) = params {
1170            serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1171        } else {
1172            return Err(JsonRpcError::invalid_params("Missing params"));
1173        };
1174
1175        let result = match params.name.as_str() {
1176            "code_review" => {
1177                let file = params
1178                    .arguments
1179                    .get("file")
1180                    .and_then(|v| v.as_str())
1181                    .unwrap_or("file.rs");
1182
1183                GetPromptResult {
1184                    description: Some("Code review prompt".to_string()),
1185                    messages: vec![PromptMessage {
1186                        role: PromptRole::User,
1187                        content: PromptContent::Text {
1188                            text: format!(
1189                                "Please review the following code for:\n\
1190                                     - Bugs and potential issues\n\
1191                                     - Performance concerns\n\
1192                                     - Code style and best practices\n\
1193                                     - Security vulnerabilities\n\n\
1194                                     File: {}",
1195                                file
1196                            ),
1197                        },
1198                    }],
1199                }
1200            }
1201            "explain_code" => {
1202                let file = params
1203                    .arguments
1204                    .get("file")
1205                    .and_then(|v| v.as_str())
1206                    .unwrap_or("file.rs");
1207
1208                GetPromptResult {
1209                    description: Some("Code explanation prompt".to_string()),
1210                    messages: vec![PromptMessage {
1211                        role: PromptRole::User,
1212                        content: PromptContent::Text {
1213                            text: format!(
1214                                "Please explain what this code does, including:\n\
1215                                     - Overall purpose\n\
1216                                     - Key functions and their roles\n\
1217                                     - Data flow\n\
1218                                     - Important algorithms used\n\n\
1219                                     File: {}",
1220                                file
1221                            ),
1222                        },
1223                    }],
1224                }
1225            }
1226            _ => return Err(JsonRpcError::method_not_found(&params.name)),
1227        };
1228
1229        serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1230    }
1231}