1use super::bus_bridge::BusBridge;
17use super::transport::{McpMessage, StdioTransport, Transport};
18use super::types::*;
19use crate::bus::{AgentBus, BusMessage};
20use crate::tool::ToolRegistry;
21use anyhow::Result;
22use serde_json::{Value, json};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26use tracing::{debug, info, warn};
27
28pub struct McpServer {
30 transport: Arc<dyn Transport>,
31 tools: RwLock<HashMap<String, McpToolHandler>>,
32 resources: RwLock<HashMap<String, McpResourceHandler>>,
33 #[allow(dead_code)]
35 prompts: RwLock<HashMap<String, McpPromptHandler>>,
36 initialized: RwLock<bool>,
37 server_info: ServerInfo,
38 metadata: RwLock<HashMap<String, ToolMetadata>>,
40 resource_metadata: RwLock<HashMap<String, ResourceMetadata>>,
42 bus: Option<Arc<BusBridge>>,
44 agent_bus: Option<Arc<AgentBus>>,
46 tool_registry: Option<Arc<ToolRegistry>>,
48}
49
50type McpToolHandler = Arc<dyn Fn(Value) -> Result<CallToolResult> + Send + Sync>;
51type McpResourceHandler = Arc<dyn Fn(String) -> Result<ReadResourceResult> + Send + Sync>;
52type McpPromptHandler = Arc<dyn Fn(Value) -> Result<GetPromptResult> + Send + Sync>;
53
54impl McpServer {
55 pub fn new_stdio() -> Self {
57 let transport: Arc<dyn Transport> = Arc::new(StdioTransport::new());
59 Self::new(transport)
60 }
61
62 pub fn new_local() -> Self {
68 let transport: Arc<dyn Transport> = Arc::new(super::transport::NullTransport::new());
69 Self::new(transport)
70 }
71
72 pub fn new(transport: Arc<dyn Transport>) -> Self {
74 let mut server = Self {
75 transport,
76 tools: RwLock::new(HashMap::new()),
77 resources: RwLock::new(HashMap::new()),
78 prompts: RwLock::new(HashMap::new()),
79 initialized: RwLock::new(false),
80 server_info: ServerInfo {
81 name: "codetether".to_string(),
82 version: env!("CARGO_PKG_VERSION").to_string(),
83 },
84 metadata: RwLock::new(HashMap::new()),
85 resource_metadata: RwLock::new(HashMap::new()),
86 bus: None,
87 agent_bus: None,
88 tool_registry: None,
89 };
90
91 server.register_default_tools();
93
94 server
95 }
96
97 pub fn with_tool_registry(mut self, registry: Arc<ToolRegistry>) -> Self {
102 self.tool_registry = Some(registry);
103 self
104 }
105
106 pub fn with_agent_bus(mut self, bus: Arc<AgentBus>) -> Self {
111 self.agent_bus = Some(bus);
112 self
113 }
114
115 pub async fn with_bus(self, bus_url: String) -> Self {
119 self.with_bus_auth(bus_url, None).await
120 }
121
122 pub async fn with_bus_auth(mut self, bus_url: String, auth_token: Option<String>) -> Self {
124 let bridge = BusBridge::with_auth(bus_url, auth_token).spawn();
125 self.bus = Some(Arc::clone(&bridge));
126 self.register_bus_tools(Arc::clone(&bridge)).await;
127 self.register_bus_resources(Arc::clone(&bridge)).await;
128 self
129 }
130
131 async fn register_bus_tools(&self, bridge: Arc<BusBridge>) {
133 let b = Arc::clone(&bridge);
135 self.register_tool(
136 "bus_events",
137 "Query recent events from the agent bus. Returns BusEnvelope JSON objects \
138 matching the optional topic filter (supports wildcards like 'ralph.*').",
139 json!({
140 "type": "object",
141 "properties": {
142 "topic_filter": {
143 "type": "string",
144 "description": "Topic pattern to filter (e.g. 'ralph.*', 'agent.*', '*'). Default: all."
145 },
146 "limit": {
147 "type": "integer",
148 "description": "Max events to return (default: 50, max: 500)"
149 }
150 }
151 }),
152 Arc::new(move |args| {
153 let topic_filter = args.get("topic_filter").and_then(|v| v.as_str()).map(String::from);
154 let limit = args
155 .get("limit")
156 .and_then(|v| v.as_u64())
157 .unwrap_or(50)
158 .min(500) as usize;
159
160 let b = Arc::clone(&b);
161 let events = tokio::task::block_in_place(|| {
162 tokio::runtime::Handle::current().block_on(async {
163 b.recent_events(topic_filter.as_deref(), limit, None).await
164 })
165 });
166
167 let text = serde_json::to_string_pretty(&events)
168 .unwrap_or_else(|_| "[]".to_string());
169
170 Ok(CallToolResult {
171 content: vec![ToolContent::Text { text }],
172 is_error: false,
173 })
174 }),
175 )
176 .await;
177
178 let b = Arc::clone(&bridge);
180 self.register_tool(
181 "bus_status",
182 "Get the current status of the bus bridge: connection state, event count, \
183 and buffer usage.",
184 json!({
185 "type": "object",
186 "properties": {}
187 }),
188 Arc::new(move |_args| {
189 let status = b.status();
190 let buffer_len = tokio::task::block_in_place(|| {
191 tokio::runtime::Handle::current().block_on(b.buffer_len())
192 });
193
194 let text = serde_json::to_string_pretty(&json!({
195 "connected": status.connected,
196 "total_received": status.total_received,
197 "buffer_used": buffer_len,
198 "buffer_capacity": status.buffer_capacity,
199 "bus_url": status.bus_url,
200 }))
201 .unwrap_or_default();
202
203 Ok(CallToolResult {
204 content: vec![ToolContent::Text { text }],
205 is_error: false,
206 })
207 }),
208 )
209 .await;
210
211 self.register_tool(
213 "ralph_status",
214 "Get current Ralph PRD status: story pass/fail states, iteration count, \
215 and progress.txt content. Reads prd.json and progress.txt from the \
216 current working directory.",
217 json!({
218 "type": "object",
219 "properties": {
220 "prd_path": {
221 "type": "string",
222 "description": "Path to prd.json (default: ./prd.json)"
223 }
224 }
225 }),
226 Arc::new(|args| {
227 let prd_path = args
228 .get("prd_path")
229 .and_then(|v| v.as_str())
230 .unwrap_or("prd.json");
231
232 let mut result = json!({});
233
234 if let Ok(content) = std::fs::read_to_string(prd_path) {
236 if let Ok(prd) = serde_json::from_str::<serde_json::Value>(&content) {
237 let stories = prd.get("user_stories").and_then(|s| s.as_array());
238 let passed = stories
239 .map(|arr| {
240 arr.iter()
241 .filter(|s| {
242 s.get("passes").and_then(|v| v.as_bool()).unwrap_or(false)
243 })
244 .count()
245 })
246 .unwrap_or(0);
247 let total = stories.map(|arr| arr.len()).unwrap_or(0);
248
249 result["prd"] = prd;
250 result["summary"] = json!({
251 "passed": passed,
252 "total": total,
253 "progress_pct": if total > 0 { (passed * 100) / total } else { 0 },
254 });
255 }
256 } else {
257 result["prd_error"] = json!("prd.json not found");
258 }
259
260 let progress_path = std::path::Path::new(prd_path)
262 .parent()
263 .unwrap_or(std::path::Path::new("."))
264 .join("progress.txt");
265 if let Ok(progress) = std::fs::read_to_string(&progress_path) {
266 result["progress"] = json!(progress);
267 }
268
269 let text = serde_json::to_string_pretty(&result).unwrap_or_default();
270
271 Ok(CallToolResult {
272 content: vec![ToolContent::Text { text }],
273 is_error: false,
274 })
275 }),
276 )
277 .await;
278
279 info!("Registered bus-aware MCP tools: bus_events, bus_status, ralph_status");
280 }
281
282 async fn register_bus_resources(&self, bridge: Arc<BusBridge>) {
284 let b = Arc::clone(&bridge);
286 self.register_resource(
287 "codetether://bus/events/recent",
288 "Recent Bus Events",
289 "Last 100 events from the agent bus (JSON array of BusEnvelope)",
290 Some("application/json"),
291 Arc::new(move |_uri| {
292 let events = tokio::task::block_in_place(|| {
293 tokio::runtime::Handle::current()
294 .block_on(async { b.recent_events(None, 100, None).await })
295 });
296 let text =
297 serde_json::to_string_pretty(&events).unwrap_or_else(|_| "[]".to_string());
298 Ok(ReadResourceResult {
299 contents: vec![ResourceContents {
300 uri: "codetether://bus/events/recent".to_string(),
301 mime_type: Some("application/json".to_string()),
302 content: ResourceContent::Text { text },
303 }],
304 })
305 }),
306 )
307 .await;
308
309 self.register_resource(
311 "codetether://ralph/prd",
312 "Ralph PRD",
313 "Current PRD JSON with story pass/fail status",
314 Some("application/json"),
315 Arc::new(|_uri| {
316 let text = std::fs::read_to_string("prd.json")
317 .unwrap_or_else(|_| r#"{"error": "prd.json not found"}"#.to_string());
318 Ok(ReadResourceResult {
319 contents: vec![ResourceContents {
320 uri: "codetether://ralph/prd".to_string(),
321 mime_type: Some("application/json".to_string()),
322 content: ResourceContent::Text { text },
323 }],
324 })
325 }),
326 )
327 .await;
328
329 self.register_resource(
331 "codetether://ralph/progress",
332 "Ralph Progress",
333 "progress.txt content from the current Ralph run",
334 Some("text/plain"),
335 Arc::new(|_uri| {
336 let text = std::fs::read_to_string("progress.txt")
337 .unwrap_or_else(|_| "(no progress.txt found)".to_string());
338 Ok(ReadResourceResult {
339 contents: vec![ResourceContents {
340 uri: "codetether://ralph/progress".to_string(),
341 mime_type: Some("text/plain".to_string()),
342 content: ResourceContent::Text { text },
343 }],
344 })
345 }),
346 )
347 .await;
348
349 info!("Registered bus-aware MCP resources");
350 }
351
352 fn register_default_tools(&mut self) {
354 }
357
358 pub async fn register_tool(
360 &self,
361 name: &str,
362 description: &str,
363 input_schema: Value,
364 handler: McpToolHandler,
365 ) {
366 let metadata = ToolMetadata::new(
368 name.to_string(),
369 Some(description.to_string()),
370 input_schema.clone(),
371 );
372
373 let mut metadata_map = self.metadata.write().await;
374 metadata_map.insert(name.to_string(), metadata);
375 drop(metadata_map);
376
377 let mut tools = self.tools.write().await;
378 tools.insert(name.to_string(), handler);
379
380 debug!("Registered MCP tool: {}", name);
381 }
382
383 pub async fn register_resource(
385 &self,
386 uri: &str,
387 name: &str,
388 description: &str,
389 mime_type: Option<&str>,
390 handler: McpResourceHandler,
391 ) {
392 let metadata = ResourceMetadata::new(
394 uri.to_string(),
395 name.to_string(),
396 Some(description.to_string()),
397 mime_type.map(|s| s.to_string()),
398 );
399
400 let mut metadata_map = self.resource_metadata.write().await;
401 metadata_map.insert(uri.to_string(), metadata);
402 drop(metadata_map);
403
404 let mut resources = self.resources.write().await;
405 resources.insert(uri.to_string(), handler);
406
407 debug!("Registered MCP resource: {}", uri);
408 }
409
410 pub async fn get_tool_metadata(&self, name: &str) -> Option<ToolMetadata> {
412 let metadata = self.metadata.read().await;
413 metadata.get(name).cloned()
414 }
415
416 pub async fn get_all_tool_metadata(&self) -> Vec<ToolMetadata> {
418 let metadata = self.metadata.read().await;
419 metadata.values().cloned().collect()
420 }
421
422 pub async fn get_resource_metadata(&self, uri: &str) -> Option<ResourceMetadata> {
424 let metadata = self.resource_metadata.read().await;
425 metadata.get(uri).cloned()
426 }
427
428 pub async fn get_all_resource_metadata(&self) -> Vec<ResourceMetadata> {
430 let metadata = self.resource_metadata.read().await;
431 metadata.values().cloned().collect()
432 }
433
434 pub async fn register_prompt(&self, name: &str, handler: McpPromptHandler) {
436 let mut prompts = self.prompts.write().await;
437 prompts.insert(name.to_string(), handler);
438 debug!("Registered MCP prompt: {}", name);
439 }
440
441 pub async fn get_prompt_handler(&self, name: &str) -> Option<McpPromptHandler> {
443 let prompts = self.prompts.read().await;
444 prompts.get(name).cloned()
445 }
446
447 pub async fn list_prompts(&self) -> Vec<String> {
449 let prompts = self.prompts.read().await;
450 prompts.keys().cloned().collect()
451 }
452
453 pub async fn run(&self) -> Result<()> {
455 info!("Starting MCP server...");
456
457 self.setup_tools().await;
459
460 loop {
461 match self.transport.receive().await? {
462 Some(McpMessage::Request(request)) => {
463 let response = self.handle_request(request).await;
464 self.transport.send_response(response).await?;
465 }
466 Some(McpMessage::Notification(notification)) => {
467 self.handle_notification(notification).await;
468 }
469 Some(McpMessage::Response(response)) => {
470 warn!("Unexpected response received: {:?}", response.id);
472 }
473 None => {
474 info!("Transport closed, shutting down MCP server");
475 break;
476 }
477 }
478 }
479
480 Ok(())
481 }
482
483 pub async fn setup_tools_public(&self) {
485 self.setup_tools().await;
486 }
487
488 pub async fn call_tool_direct(&self, name: &str, arguments: Value) -> Result<CallToolResult> {
490 let tools = self.tools.read().await;
491 let handler = tools
492 .get(name)
493 .ok_or_else(|| anyhow::anyhow!("Tool not found: {}", name))?
494 .clone();
495 drop(tools);
496 handler(arguments)
497 }
498
499 async fn setup_tools(&self) {
501 if let Some(registry) = &self.tool_registry {
503 self.register_registry_tools(registry).await;
504 info!(
505 "Registered {} MCP tools from ToolRegistry",
506 self.tools.read().await.len()
507 );
508 return;
509 }
510
511 self.register_fallback_tools().await;
513 info!(
514 "Registered {} MCP tools (fallback)",
515 self.tools.read().await.len()
516 );
517 }
518
519 async fn register_registry_tools(&self, registry: &Arc<ToolRegistry>) {
521 let skip_tools = [
530 "question",
531 "invalid",
532 "batch",
533 "confirm_edit",
534 "confirm_multiedit",
535 "plan_enter",
536 "plan_exit",
537 "voice",
538 "podcast",
539 "youtube",
540 "avatar",
541 "image",
542 "undo",
543 ];
544
545 for tool_id in registry.list() {
546 if skip_tools.contains(&tool_id) {
547 continue;
548 }
549
550 let tool = match registry.get(tool_id) {
551 Some(t) => t,
552 None => continue,
553 };
554
555 let tool_clone = Arc::clone(&tool);
556
557 self.register_tool(
558 tool.id(),
559 tool.description(),
560 tool.parameters(),
561 Arc::new(move |args: Value| {
562 let tool = Arc::clone(&tool_clone);
563 let result = tokio::task::block_in_place(|| {
564 tokio::runtime::Handle::current()
565 .block_on(async { tool.execute(args).await })
566 });
567
568 match result {
569 Ok(tool_result) => Ok(CallToolResult {
570 content: vec![ToolContent::Text {
571 text: tool_result.output,
572 }],
573 is_error: !tool_result.success,
574 }),
575 Err(e) => Ok(CallToolResult {
576 content: vec![ToolContent::Text {
577 text: e.to_string(),
578 }],
579 is_error: true,
580 }),
581 }
582 }),
583 )
584 .await;
585 }
586 }
587
588 async fn register_fallback_tools(&self) {
590 self.register_tool(
592 "run_command",
593 "Execute a shell command and return the output",
594 json!({
595 "type": "object",
596 "properties": {
597 "command": {
598 "type": "string",
599 "description": "The command to execute"
600 },
601 "cwd": {
602 "type": "string",
603 "description": "Working directory (optional)"
604 },
605 "timeout_ms": {
606 "type": "integer",
607 "description": "Timeout in milliseconds (default: 30000)"
608 }
609 },
610 "required": ["command"]
611 }),
612 Arc::new(|args| {
613 let command = args
614 .get("command")
615 .and_then(|v| v.as_str())
616 .ok_or_else(|| anyhow::anyhow!("Missing command"))?;
617
618 let cwd = args.get("cwd").and_then(|v| v.as_str());
619
620 let mut cmd = std::process::Command::new("/bin/sh");
621 cmd.arg("-c").arg(command);
622
623 if let Some(dir) = cwd {
624 cmd.current_dir(dir);
625 }
626
627 let output = cmd.output()?;
628 let stdout = String::from_utf8_lossy(&output.stdout);
629 let stderr = String::from_utf8_lossy(&output.stderr);
630
631 let result = if output.status.success() {
632 format!("{}{}", stdout, stderr)
633 } else {
634 format!(
635 "Exit code: {}\n{}{}",
636 output.status.code().unwrap_or(-1),
637 stdout,
638 stderr
639 )
640 };
641
642 Ok(CallToolResult {
643 content: vec![ToolContent::Text { text: result }],
644 is_error: !output.status.success(),
645 })
646 }),
647 )
648 .await;
649
650 self.register_tool(
652 "read_file",
653 "Read the contents of a file",
654 json!({
655 "type": "object",
656 "properties": {
657 "path": {
658 "type": "string",
659 "description": "Path to the file to read"
660 },
661 "offset": {
662 "type": "integer",
663 "description": "Line offset to start reading from (1-indexed)"
664 },
665 "limit": {
666 "type": "integer",
667 "description": "Maximum number of lines to read"
668 }
669 },
670 "required": ["path"]
671 }),
672 Arc::new(|args| {
673 let path = args
674 .get("path")
675 .and_then(|v| v.as_str())
676 .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
677
678 let content = std::fs::read_to_string(path)?;
679
680 let offset = args.get("offset").and_then(|v| v.as_u64()).unwrap_or(1) as usize;
681 let limit = args.get("limit").and_then(|v| v.as_u64());
682
683 let lines: Vec<&str> = content.lines().collect();
684 let start = (offset.saturating_sub(1)).min(lines.len());
685 let end = if let Some(l) = limit {
686 (start + l as usize).min(lines.len())
687 } else {
688 lines.len()
689 };
690
691 let result = lines[start..end].join("\n");
692
693 Ok(CallToolResult {
694 content: vec![ToolContent::Text { text: result }],
695 is_error: false,
696 })
697 }),
698 )
699 .await;
700
701 self.register_tool(
703 "write_file",
704 "Write content to a file",
705 json!({
706 "type": "object",
707 "properties": {
708 "path": {
709 "type": "string",
710 "description": "Path to the file to write"
711 },
712 "content": {
713 "type": "string",
714 "description": "Content to write"
715 },
716 "create_dirs": {
717 "type": "boolean",
718 "description": "Create parent directories if they don't exist"
719 }
720 },
721 "required": ["path", "content"]
722 }),
723 Arc::new(|args| {
724 let path = args
725 .get("path")
726 .and_then(|v| v.as_str())
727 .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
728
729 let content = args
730 .get("content")
731 .and_then(|v| v.as_str())
732 .ok_or_else(|| anyhow::anyhow!("Missing content"))?;
733
734 let create_dirs = args
735 .get("create_dirs")
736 .and_then(|v| v.as_bool())
737 .unwrap_or(false);
738
739 if create_dirs && let Some(parent) = std::path::Path::new(path).parent() {
740 std::fs::create_dir_all(parent)?;
741 }
742
743 std::fs::write(path, content)?;
744
745 Ok(CallToolResult {
746 content: vec![ToolContent::Text {
747 text: format!("Wrote {} bytes to {}", content.len(), path),
748 }],
749 is_error: false,
750 })
751 }),
752 )
753 .await;
754
755 self.register_tool(
757 "list_directory",
758 "List contents of a directory",
759 json!({
760 "type": "object",
761 "properties": {
762 "path": {
763 "type": "string",
764 "description": "Path to the directory"
765 },
766 "recursive": {
767 "type": "boolean",
768 "description": "List recursively"
769 },
770 "max_depth": {
771 "type": "integer",
772 "description": "Maximum depth for recursive listing"
773 }
774 },
775 "required": ["path"]
776 }),
777 Arc::new(|args| {
778 let path = args
779 .get("path")
780 .and_then(|v| v.as_str())
781 .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
782
783 let mut entries = Vec::new();
784 for entry in std::fs::read_dir(path)? {
785 let entry = entry?;
786 let file_type = entry.file_type()?;
787 let name = entry.file_name().to_string_lossy().to_string();
788 let suffix = if file_type.is_dir() { "/" } else { "" };
789 entries.push(format!("{}{}", name, suffix));
790 }
791
792 entries.sort();
793
794 Ok(CallToolResult {
795 content: vec![ToolContent::Text {
796 text: entries.join("\n"),
797 }],
798 is_error: false,
799 })
800 }),
801 )
802 .await;
803
804 self.register_tool(
806 "search_files",
807 "Search for files matching a pattern",
808 json!({
809 "type": "object",
810 "properties": {
811 "pattern": {
812 "type": "string",
813 "description": "Search pattern (glob or regex)"
814 },
815 "path": {
816 "type": "string",
817 "description": "Directory to search in"
818 },
819 "content_pattern": {
820 "type": "string",
821 "description": "Pattern to search in file contents"
822 }
823 },
824 "required": ["pattern"]
825 }),
826 Arc::new(|args| {
827 let pattern = args
828 .get("pattern")
829 .and_then(|v| v.as_str())
830 .ok_or_else(|| anyhow::anyhow!("Missing pattern"))?;
831
832 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
833
834 let output = std::process::Command::new("find")
836 .args([path, "-name", pattern, "-type", "f"])
837 .output()?;
838
839 let result = String::from_utf8_lossy(&output.stdout);
840
841 Ok(CallToolResult {
842 content: vec![ToolContent::Text {
843 text: result.to_string(),
844 }],
845 is_error: !output.status.success(),
846 })
847 }),
848 )
849 .await;
850
851 self.register_tool(
853 "grep_search",
854 "Search file contents using grep",
855 json!({
856 "type": "object",
857 "properties": {
858 "query": {
859 "type": "string",
860 "description": "Search pattern"
861 },
862 "path": {
863 "type": "string",
864 "description": "Directory or file to search"
865 },
866 "is_regex": {
867 "type": "boolean",
868 "description": "Treat pattern as regex"
869 },
870 "case_sensitive": {
871 "type": "boolean",
872 "description": "Case-sensitive search"
873 }
874 },
875 "required": ["query"]
876 }),
877 Arc::new(|args| {
878 let query = args
879 .get("query")
880 .and_then(|v| v.as_str())
881 .ok_or_else(|| anyhow::anyhow!("Missing query"))?;
882
883 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
884
885 let is_regex = args
886 .get("is_regex")
887 .and_then(|v| v.as_bool())
888 .unwrap_or(false);
889
890 let case_sensitive = args
891 .get("case_sensitive")
892 .and_then(|v| v.as_bool())
893 .unwrap_or(false);
894
895 let mut cmd = std::process::Command::new("grep");
896 cmd.arg("-r").arg("-n");
897
898 if !case_sensitive {
899 cmd.arg("-i");
900 }
901
902 if is_regex {
903 cmd.arg("-E");
904 } else {
905 cmd.arg("-F");
906 }
907
908 cmd.arg(query).arg(path);
909
910 let output = cmd.output()?;
911 let result = String::from_utf8_lossy(&output.stdout);
912
913 Ok(CallToolResult {
914 content: vec![ToolContent::Text {
915 text: result.to_string(),
916 }],
917 is_error: false,
918 })
919 }),
920 )
921 .await;
922
923 info!("Registered {} MCP tools", self.tools.read().await.len());
924 }
925
926 async fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse {
928 debug!(
929 "Handling request: {} (id: {:?})",
930 request.method, request.id
931 );
932
933 let result = match request.method.as_str() {
934 "initialize" => self.handle_initialize(request.params).await,
935 "initialized" => Ok(json!({})),
936 "ping" => Ok(json!({})),
937 "tools/list" => self.handle_list_tools(request.params).await,
938 "tools/call" => self.handle_call_tool(request.params).await,
939 "resources/list" => self.handle_list_resources(request.params).await,
940 "resources/read" => self.handle_read_resource(request.params).await,
941 "prompts/list" => self.handle_list_prompts(request.params).await,
942 "prompts/get" => self.handle_get_prompt(request.params).await,
943 _ => Err(JsonRpcError::method_not_found(&request.method)),
944 };
945
946 match result {
947 Ok(value) => JsonRpcResponse::success(request.id, value),
948 Err(error) => JsonRpcResponse::error(request.id, error),
949 }
950 }
951
952 async fn handle_notification(&self, notification: JsonRpcNotification) {
954 debug!("Handling notification: {}", notification.method);
955
956 match notification.method.as_str() {
957 "notifications/initialized" => {
958 *self.initialized.write().await = true;
959 info!("MCP client initialized");
960 }
961 "notifications/cancelled" => {
962 }
964 _ => {
965 debug!("Unknown notification: {}", notification.method);
966 }
967 }
968 }
969
970 async fn handle_initialize(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
972 let _params: InitializeParams = if let Some(p) = params {
973 serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
974 } else {
975 return Err(JsonRpcError::invalid_params("Missing params"));
976 };
977
978 let result = InitializeResult {
979 protocol_version: PROTOCOL_VERSION.to_string(),
980 capabilities: ServerCapabilities {
981 tools: Some(ToolsCapability { list_changed: true }),
982 resources: Some(ResourcesCapability {
983 subscribe: false,
984 list_changed: true,
985 }),
986 prompts: Some(PromptsCapability { list_changed: true }),
987 logging: Some(LoggingCapability {}),
988 experimental: None,
989 },
990 server_info: self.server_info.clone(),
991 instructions: Some(
992 "CodeTether is an AI coding agent with tools for file operations, \
993 command execution, code search, and autonomous task execution. \
994 Use the swarm tool for complex tasks requiring parallel execution, \
995 and ralph for PRD-driven development."
996 .to_string(),
997 ),
998 };
999
1000 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1001 }
1002
1003 async fn handle_list_tools(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1005 let metadata = self.metadata.read().await;
1007
1008 let tool_list: Vec<McpTool> = metadata
1009 .values()
1010 .map(|tm| McpTool {
1011 name: tm.name.clone(),
1012 description: tm.description.clone(),
1013 input_schema: tm.input_schema.clone(),
1014 })
1015 .collect();
1016
1017 let result = ListToolsResult {
1018 tools: tool_list,
1019 next_cursor: None,
1020 };
1021
1022 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1023 }
1024
1025 async fn handle_call_tool(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1027 let params: CallToolParams = if let Some(p) = params {
1028 serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1029 } else {
1030 return Err(JsonRpcError::invalid_params("Missing params"));
1031 };
1032
1033 let request_id = uuid::Uuid::new_v4().to_string();
1035 let bus_handle = self.agent_bus.as_ref().map(|bus| bus.handle("mcp-server"));
1036 if let Some(ref bh) = bus_handle {
1037 bh.send(
1038 format!("tools.{}", ¶ms.name),
1039 BusMessage::ToolRequest {
1040 request_id: request_id.clone(),
1041 agent_id: "mcp-server".into(),
1042 tool_name: params.name.clone(),
1043 arguments: params.arguments.clone(),
1044 step: 0,
1045 },
1046 );
1047 }
1048
1049 let tools = self.tools.read().await;
1050 let handler = tools
1051 .get(¶ms.name)
1052 .ok_or_else(|| JsonRpcError::method_not_found(¶ms.name))?;
1053
1054 let (result_value, output_text, success) = match handler(params.arguments) {
1055 Ok(result) => {
1056 let text = result
1057 .content
1058 .iter()
1059 .filter_map(|c| match c {
1060 ToolContent::Text { text } => Some(text.as_str()),
1061 _ => None,
1062 })
1063 .collect::<Vec<_>>()
1064 .join("\n");
1065 let is_err = result.is_error;
1066 let val = serde_json::to_value(result)
1067 .map_err(|e| JsonRpcError::internal_error(e.to_string()))?;
1068 (val, text, !is_err)
1069 }
1070 Err(e) => {
1071 let err_text = e.to_string();
1072 let result = CallToolResult {
1073 content: vec![ToolContent::Text {
1074 text: err_text.clone(),
1075 }],
1076 is_error: true,
1077 };
1078 let val = serde_json::to_value(result)
1079 .map_err(|e| JsonRpcError::internal_error(e.to_string()))?;
1080 (val, err_text, false)
1081 }
1082 };
1083
1084 if let Some(ref bh) = bus_handle {
1086 bh.send(
1087 format!("tools.{}", ¶ms.name),
1088 BusMessage::ToolResponse {
1089 request_id,
1090 agent_id: "mcp-server".into(),
1091 tool_name: params.name,
1092 result: output_text,
1093 success,
1094 step: 0,
1095 },
1096 );
1097 }
1098
1099 Ok(result_value)
1100 }
1101
1102 async fn handle_list_resources(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1104 let metadata = self.resource_metadata.read().await;
1105 let resource_list: Vec<McpResource> = metadata
1106 .values()
1107 .map(|rm| McpResource {
1108 uri: rm.uri.clone(),
1109 name: rm.name.clone(),
1110 description: rm.description.clone(),
1111 mime_type: rm.mime_type.clone(),
1112 })
1113 .collect();
1114
1115 let result = ListResourcesResult {
1116 resources: resource_list,
1117 next_cursor: None,
1118 };
1119
1120 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1121 }
1122
1123 async fn handle_read_resource(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1125 let params: ReadResourceParams = if let Some(p) = params {
1126 serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1127 } else {
1128 return Err(JsonRpcError::invalid_params("Missing params"));
1129 };
1130
1131 let resources = self.resources.read().await;
1132 let handler = resources
1133 .get(¶ms.uri)
1134 .ok_or_else(|| JsonRpcError::method_not_found(¶ms.uri))?;
1135
1136 match handler(params.uri.clone()) {
1137 Ok(result) => serde_json::to_value(result)
1138 .map_err(|e| JsonRpcError::internal_error(e.to_string())),
1139 Err(e) => Err(JsonRpcError::internal_error(e.to_string())),
1140 }
1141 }
1142
1143 async fn handle_list_prompts(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1145 let result = ListPromptsResult {
1146 prompts: vec![
1147 McpPrompt {
1148 name: "code_review".to_string(),
1149 description: Some("Review code for issues and improvements".to_string()),
1150 arguments: vec![PromptArgument {
1151 name: "file".to_string(),
1152 description: Some("File to review".to_string()),
1153 required: true,
1154 }],
1155 },
1156 McpPrompt {
1157 name: "explain_code".to_string(),
1158 description: Some("Explain what code does".to_string()),
1159 arguments: vec![PromptArgument {
1160 name: "file".to_string(),
1161 description: Some("File to explain".to_string()),
1162 required: true,
1163 }],
1164 },
1165 ],
1166 next_cursor: None,
1167 };
1168
1169 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1170 }
1171
1172 async fn handle_get_prompt(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1174 let params: GetPromptParams = if let Some(p) = params {
1175 serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1176 } else {
1177 return Err(JsonRpcError::invalid_params("Missing params"));
1178 };
1179
1180 let result = match params.name.as_str() {
1181 "code_review" => {
1182 let file = params
1183 .arguments
1184 .get("file")
1185 .and_then(|v| v.as_str())
1186 .unwrap_or("file.rs");
1187
1188 GetPromptResult {
1189 description: Some("Code review prompt".to_string()),
1190 messages: vec![PromptMessage {
1191 role: PromptRole::User,
1192 content: PromptContent::Text {
1193 text: format!(
1194 "Please review the following code for:\n\
1195 - Bugs and potential issues\n\
1196 - Performance concerns\n\
1197 - Code style and best practices\n\
1198 - Security vulnerabilities\n\n\
1199 File: {}",
1200 file
1201 ),
1202 },
1203 }],
1204 }
1205 }
1206 "explain_code" => {
1207 let file = params
1208 .arguments
1209 .get("file")
1210 .and_then(|v| v.as_str())
1211 .unwrap_or("file.rs");
1212
1213 GetPromptResult {
1214 description: Some("Code explanation prompt".to_string()),
1215 messages: vec![PromptMessage {
1216 role: PromptRole::User,
1217 content: PromptContent::Text {
1218 text: format!(
1219 "Please explain what this code does, including:\n\
1220 - Overall purpose\n\
1221 - Key functions and their roles\n\
1222 - Data flow\n\
1223 - Important algorithms used\n\n\
1224 File: {}",
1225 file
1226 ),
1227 },
1228 }],
1229 }
1230 }
1231 _ => return Err(JsonRpcError::method_not_found(¶ms.name)),
1232 };
1233
1234 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1235 }
1236}