1use super::bus_bridge::BusBridge;
17use super::transport::{McpMessage, StdioTransport, Transport};
18use super::types::*;
19use crate::bus::{AgentBus, BusMessage};
20use crate::tool::ToolRegistry;
21use anyhow::Result;
22use serde_json::{Value, json};
23use std::collections::HashMap;
24use std::sync::Arc;
25use tokio::sync::RwLock;
26use tracing::{debug, info, warn};
27
28pub struct McpServer {
30 transport: Arc<dyn Transport>,
31 tools: RwLock<HashMap<String, McpToolHandler>>,
32 resources: RwLock<HashMap<String, McpResourceHandler>>,
33 #[allow(dead_code)]
35 prompts: RwLock<HashMap<String, McpPromptHandler>>,
36 initialized: RwLock<bool>,
37 server_info: ServerInfo,
38 metadata: RwLock<HashMap<String, ToolMetadata>>,
40 resource_metadata: RwLock<HashMap<String, ResourceMetadata>>,
42 bus: Option<Arc<BusBridge>>,
44 agent_bus: Option<Arc<AgentBus>>,
46 tool_registry: Option<Arc<ToolRegistry>>,
48}
49
50type McpToolHandler = Arc<dyn Fn(Value) -> Result<CallToolResult> + Send + Sync>;
51type McpResourceHandler = Arc<dyn Fn(String) -> Result<ReadResourceResult> + Send + Sync>;
52type McpPromptHandler = Arc<dyn Fn(Value) -> Result<GetPromptResult> + Send + Sync>;
53
54impl McpServer {
55 pub fn new_stdio() -> Self {
57 let transport: Arc<dyn Transport> = Arc::new(StdioTransport::new());
59 Self::new(transport)
60 }
61
62 pub fn new_local() -> Self {
68 let transport: Arc<dyn Transport> = Arc::new(super::transport::NullTransport::new());
69 Self::new(transport)
70 }
71
72 pub fn new(transport: Arc<dyn Transport>) -> Self {
74 let mut server = Self {
75 transport,
76 tools: RwLock::new(HashMap::new()),
77 resources: RwLock::new(HashMap::new()),
78 prompts: RwLock::new(HashMap::new()),
79 initialized: RwLock::new(false),
80 server_info: ServerInfo {
81 name: "codetether".to_string(),
82 version: env!("CARGO_PKG_VERSION").to_string(),
83 },
84 metadata: RwLock::new(HashMap::new()),
85 resource_metadata: RwLock::new(HashMap::new()),
86 bus: None,
87 agent_bus: None,
88 tool_registry: None,
89 };
90
91 server.register_default_tools();
93
94 server
95 }
96
97 pub fn with_tool_registry(mut self, registry: Arc<ToolRegistry>) -> Self {
102 self.tool_registry = Some(registry);
103 self
104 }
105
106 pub fn with_agent_bus(mut self, bus: Arc<AgentBus>) -> Self {
111 self.agent_bus = Some(bus);
112 self
113 }
114
115 pub async fn with_bus(mut self, bus_url: String) -> Self {
119 let bridge = BusBridge::new(bus_url).spawn();
120 self.bus = Some(Arc::clone(&bridge));
121 self.register_bus_tools(Arc::clone(&bridge)).await;
122 self.register_bus_resources(Arc::clone(&bridge)).await;
123 self
124 }
125
126 async fn register_bus_tools(&self, bridge: Arc<BusBridge>) {
128 let b = Arc::clone(&bridge);
130 self.register_tool(
131 "bus_events",
132 "Query recent events from the agent bus. Returns BusEnvelope JSON objects \
133 matching the optional topic filter (supports wildcards like 'ralph.*').",
134 json!({
135 "type": "object",
136 "properties": {
137 "topic_filter": {
138 "type": "string",
139 "description": "Topic pattern to filter (e.g. 'ralph.*', 'agent.*', '*'). Default: all."
140 },
141 "limit": {
142 "type": "integer",
143 "description": "Max events to return (default: 50, max: 500)"
144 }
145 }
146 }),
147 Arc::new(move |args| {
148 let topic_filter = args.get("topic_filter").and_then(|v| v.as_str()).map(String::from);
149 let limit = args
150 .get("limit")
151 .and_then(|v| v.as_u64())
152 .unwrap_or(50)
153 .min(500) as usize;
154
155 let b = Arc::clone(&b);
156 let events = tokio::task::block_in_place(|| {
157 tokio::runtime::Handle::current().block_on(async {
158 b.recent_events(topic_filter.as_deref(), limit, None).await
159 })
160 });
161
162 let text = serde_json::to_string_pretty(&events)
163 .unwrap_or_else(|_| "[]".to_string());
164
165 Ok(CallToolResult {
166 content: vec![ToolContent::Text { text }],
167 is_error: false,
168 })
169 }),
170 )
171 .await;
172
173 let b = Arc::clone(&bridge);
175 self.register_tool(
176 "bus_status",
177 "Get the current status of the bus bridge: connection state, event count, \
178 and buffer usage.",
179 json!({
180 "type": "object",
181 "properties": {}
182 }),
183 Arc::new(move |_args| {
184 let status = b.status();
185 let buffer_len = tokio::task::block_in_place(|| {
186 tokio::runtime::Handle::current().block_on(b.buffer_len())
187 });
188
189 let text = serde_json::to_string_pretty(&json!({
190 "connected": status.connected,
191 "total_received": status.total_received,
192 "buffer_used": buffer_len,
193 "buffer_capacity": status.buffer_capacity,
194 "bus_url": status.bus_url,
195 }))
196 .unwrap_or_default();
197
198 Ok(CallToolResult {
199 content: vec![ToolContent::Text { text }],
200 is_error: false,
201 })
202 }),
203 )
204 .await;
205
206 self.register_tool(
208 "ralph_status",
209 "Get current Ralph PRD status: story pass/fail states, iteration count, \
210 and progress.txt content. Reads prd.json and progress.txt from the \
211 current working directory.",
212 json!({
213 "type": "object",
214 "properties": {
215 "prd_path": {
216 "type": "string",
217 "description": "Path to prd.json (default: ./prd.json)"
218 }
219 }
220 }),
221 Arc::new(|args| {
222 let prd_path = args
223 .get("prd_path")
224 .and_then(|v| v.as_str())
225 .unwrap_or("prd.json");
226
227 let mut result = json!({});
228
229 if let Ok(content) = std::fs::read_to_string(prd_path) {
231 if let Ok(prd) = serde_json::from_str::<serde_json::Value>(&content) {
232 let stories = prd.get("user_stories").and_then(|s| s.as_array());
233 let passed = stories
234 .map(|arr| {
235 arr.iter()
236 .filter(|s| {
237 s.get("passes").and_then(|v| v.as_bool()).unwrap_or(false)
238 })
239 .count()
240 })
241 .unwrap_or(0);
242 let total = stories.map(|arr| arr.len()).unwrap_or(0);
243
244 result["prd"] = prd;
245 result["summary"] = json!({
246 "passed": passed,
247 "total": total,
248 "progress_pct": if total > 0 { (passed * 100) / total } else { 0 },
249 });
250 }
251 } else {
252 result["prd_error"] = json!("prd.json not found");
253 }
254
255 let progress_path = std::path::Path::new(prd_path)
257 .parent()
258 .unwrap_or(std::path::Path::new("."))
259 .join("progress.txt");
260 if let Ok(progress) = std::fs::read_to_string(&progress_path) {
261 result["progress"] = json!(progress);
262 }
263
264 let text = serde_json::to_string_pretty(&result).unwrap_or_default();
265
266 Ok(CallToolResult {
267 content: vec![ToolContent::Text { text }],
268 is_error: false,
269 })
270 }),
271 )
272 .await;
273
274 info!("Registered bus-aware MCP tools: bus_events, bus_status, ralph_status");
275 }
276
277 async fn register_bus_resources(&self, bridge: Arc<BusBridge>) {
279 let b = Arc::clone(&bridge);
281 self.register_resource(
282 "codetether://bus/events/recent",
283 "Recent Bus Events",
284 "Last 100 events from the agent bus (JSON array of BusEnvelope)",
285 Some("application/json"),
286 Arc::new(move |_uri| {
287 let events = tokio::task::block_in_place(|| {
288 tokio::runtime::Handle::current()
289 .block_on(async { b.recent_events(None, 100, None).await })
290 });
291 let text =
292 serde_json::to_string_pretty(&events).unwrap_or_else(|_| "[]".to_string());
293 Ok(ReadResourceResult {
294 contents: vec![ResourceContents {
295 uri: "codetether://bus/events/recent".to_string(),
296 mime_type: Some("application/json".to_string()),
297 content: ResourceContent::Text { text },
298 }],
299 })
300 }),
301 )
302 .await;
303
304 self.register_resource(
306 "codetether://ralph/prd",
307 "Ralph PRD",
308 "Current PRD JSON with story pass/fail status",
309 Some("application/json"),
310 Arc::new(|_uri| {
311 let text = std::fs::read_to_string("prd.json")
312 .unwrap_or_else(|_| r#"{"error": "prd.json not found"}"#.to_string());
313 Ok(ReadResourceResult {
314 contents: vec![ResourceContents {
315 uri: "codetether://ralph/prd".to_string(),
316 mime_type: Some("application/json".to_string()),
317 content: ResourceContent::Text { text },
318 }],
319 })
320 }),
321 )
322 .await;
323
324 self.register_resource(
326 "codetether://ralph/progress",
327 "Ralph Progress",
328 "progress.txt content from the current Ralph run",
329 Some("text/plain"),
330 Arc::new(|_uri| {
331 let text = std::fs::read_to_string("progress.txt")
332 .unwrap_or_else(|_| "(no progress.txt found)".to_string());
333 Ok(ReadResourceResult {
334 contents: vec![ResourceContents {
335 uri: "codetether://ralph/progress".to_string(),
336 mime_type: Some("text/plain".to_string()),
337 content: ResourceContent::Text { text },
338 }],
339 })
340 }),
341 )
342 .await;
343
344 info!("Registered bus-aware MCP resources");
345 }
346
347 fn register_default_tools(&mut self) {
349 }
352
353 pub async fn register_tool(
355 &self,
356 name: &str,
357 description: &str,
358 input_schema: Value,
359 handler: McpToolHandler,
360 ) {
361 let metadata = ToolMetadata::new(
363 name.to_string(),
364 Some(description.to_string()),
365 input_schema.clone(),
366 );
367
368 let mut metadata_map = self.metadata.write().await;
369 metadata_map.insert(name.to_string(), metadata);
370 drop(metadata_map);
371
372 let mut tools = self.tools.write().await;
373 tools.insert(name.to_string(), handler);
374
375 debug!("Registered MCP tool: {}", name);
376 }
377
378 pub async fn register_resource(
380 &self,
381 uri: &str,
382 name: &str,
383 description: &str,
384 mime_type: Option<&str>,
385 handler: McpResourceHandler,
386 ) {
387 let metadata = ResourceMetadata::new(
389 uri.to_string(),
390 name.to_string(),
391 Some(description.to_string()),
392 mime_type.map(|s| s.to_string()),
393 );
394
395 let mut metadata_map = self.resource_metadata.write().await;
396 metadata_map.insert(uri.to_string(), metadata);
397 drop(metadata_map);
398
399 let mut resources = self.resources.write().await;
400 resources.insert(uri.to_string(), handler);
401
402 debug!("Registered MCP resource: {}", uri);
403 }
404
405 pub async fn get_tool_metadata(&self, name: &str) -> Option<ToolMetadata> {
407 let metadata = self.metadata.read().await;
408 metadata.get(name).cloned()
409 }
410
411 pub async fn get_all_tool_metadata(&self) -> Vec<ToolMetadata> {
413 let metadata = self.metadata.read().await;
414 metadata.values().cloned().collect()
415 }
416
417 pub async fn get_resource_metadata(&self, uri: &str) -> Option<ResourceMetadata> {
419 let metadata = self.resource_metadata.read().await;
420 metadata.get(uri).cloned()
421 }
422
423 pub async fn get_all_resource_metadata(&self) -> Vec<ResourceMetadata> {
425 let metadata = self.resource_metadata.read().await;
426 metadata.values().cloned().collect()
427 }
428
429 pub async fn register_prompt(&self, name: &str, handler: McpPromptHandler) {
431 let mut prompts = self.prompts.write().await;
432 prompts.insert(name.to_string(), handler);
433 debug!("Registered MCP prompt: {}", name);
434 }
435
436 pub async fn get_prompt_handler(&self, name: &str) -> Option<McpPromptHandler> {
438 let prompts = self.prompts.read().await;
439 prompts.get(name).cloned()
440 }
441
442 pub async fn list_prompts(&self) -> Vec<String> {
444 let prompts = self.prompts.read().await;
445 prompts.keys().cloned().collect()
446 }
447
448 pub async fn run(&self) -> Result<()> {
450 info!("Starting MCP server...");
451
452 self.setup_tools().await;
454
455 loop {
456 match self.transport.receive().await? {
457 Some(McpMessage::Request(request)) => {
458 let response = self.handle_request(request).await;
459 self.transport.send_response(response).await?;
460 }
461 Some(McpMessage::Notification(notification)) => {
462 self.handle_notification(notification).await;
463 }
464 Some(McpMessage::Response(response)) => {
465 warn!("Unexpected response received: {:?}", response.id);
467 }
468 None => {
469 info!("Transport closed, shutting down MCP server");
470 break;
471 }
472 }
473 }
474
475 Ok(())
476 }
477
478 pub async fn setup_tools_public(&self) {
480 self.setup_tools().await;
481 }
482
483 pub async fn call_tool_direct(&self, name: &str, arguments: Value) -> Result<CallToolResult> {
485 let tools = self.tools.read().await;
486 let handler = tools
487 .get(name)
488 .ok_or_else(|| anyhow::anyhow!("Tool not found: {}", name))?
489 .clone();
490 drop(tools);
491 handler(arguments)
492 }
493
494 async fn setup_tools(&self) {
496 if let Some(registry) = &self.tool_registry {
498 self.register_registry_tools(registry).await;
499 info!(
500 "Registered {} MCP tools from ToolRegistry",
501 self.tools.read().await.len()
502 );
503 return;
504 }
505
506 self.register_fallback_tools().await;
508 info!(
509 "Registered {} MCP tools (fallback)",
510 self.tools.read().await.len()
511 );
512 }
513
514 async fn register_registry_tools(&self, registry: &Arc<ToolRegistry>) {
516 let skip_tools = [
525 "question",
526 "invalid",
527 "batch",
528 "confirm_edit",
529 "confirm_multiedit",
530 "plan_enter",
531 "plan_exit",
532 "voice",
533 "podcast",
534 "youtube",
535 "avatar",
536 "image",
537 "undo",
538 ];
539
540 for tool_id in registry.list() {
541 if skip_tools.contains(&tool_id) {
542 continue;
543 }
544
545 let tool = match registry.get(tool_id) {
546 Some(t) => t,
547 None => continue,
548 };
549
550 let tool_clone = Arc::clone(&tool);
551
552 self.register_tool(
553 tool.id(),
554 tool.description(),
555 tool.parameters(),
556 Arc::new(move |args: Value| {
557 let tool = Arc::clone(&tool_clone);
558 let result = tokio::task::block_in_place(|| {
559 tokio::runtime::Handle::current()
560 .block_on(async { tool.execute(args).await })
561 });
562
563 match result {
564 Ok(tool_result) => Ok(CallToolResult {
565 content: vec![ToolContent::Text {
566 text: tool_result.output,
567 }],
568 is_error: !tool_result.success,
569 }),
570 Err(e) => Ok(CallToolResult {
571 content: vec![ToolContent::Text {
572 text: e.to_string(),
573 }],
574 is_error: true,
575 }),
576 }
577 }),
578 )
579 .await;
580 }
581 }
582
583 async fn register_fallback_tools(&self) {
585 self.register_tool(
587 "run_command",
588 "Execute a shell command and return the output",
589 json!({
590 "type": "object",
591 "properties": {
592 "command": {
593 "type": "string",
594 "description": "The command to execute"
595 },
596 "cwd": {
597 "type": "string",
598 "description": "Working directory (optional)"
599 },
600 "timeout_ms": {
601 "type": "integer",
602 "description": "Timeout in milliseconds (default: 30000)"
603 }
604 },
605 "required": ["command"]
606 }),
607 Arc::new(|args| {
608 let command = args
609 .get("command")
610 .and_then(|v| v.as_str())
611 .ok_or_else(|| anyhow::anyhow!("Missing command"))?;
612
613 let cwd = args.get("cwd").and_then(|v| v.as_str());
614
615 let mut cmd = std::process::Command::new("/bin/sh");
616 cmd.arg("-c").arg(command);
617
618 if let Some(dir) = cwd {
619 cmd.current_dir(dir);
620 }
621
622 let output = cmd.output()?;
623 let stdout = String::from_utf8_lossy(&output.stdout);
624 let stderr = String::from_utf8_lossy(&output.stderr);
625
626 let result = if output.status.success() {
627 format!("{}{}", stdout, stderr)
628 } else {
629 format!(
630 "Exit code: {}\n{}{}",
631 output.status.code().unwrap_or(-1),
632 stdout,
633 stderr
634 )
635 };
636
637 Ok(CallToolResult {
638 content: vec![ToolContent::Text { text: result }],
639 is_error: !output.status.success(),
640 })
641 }),
642 )
643 .await;
644
645 self.register_tool(
647 "read_file",
648 "Read the contents of a file",
649 json!({
650 "type": "object",
651 "properties": {
652 "path": {
653 "type": "string",
654 "description": "Path to the file to read"
655 },
656 "offset": {
657 "type": "integer",
658 "description": "Line offset to start reading from (1-indexed)"
659 },
660 "limit": {
661 "type": "integer",
662 "description": "Maximum number of lines to read"
663 }
664 },
665 "required": ["path"]
666 }),
667 Arc::new(|args| {
668 let path = args
669 .get("path")
670 .and_then(|v| v.as_str())
671 .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
672
673 let content = std::fs::read_to_string(path)?;
674
675 let offset = args.get("offset").and_then(|v| v.as_u64()).unwrap_or(1) as usize;
676 let limit = args.get("limit").and_then(|v| v.as_u64());
677
678 let lines: Vec<&str> = content.lines().collect();
679 let start = (offset.saturating_sub(1)).min(lines.len());
680 let end = if let Some(l) = limit {
681 (start + l as usize).min(lines.len())
682 } else {
683 lines.len()
684 };
685
686 let result = lines[start..end].join("\n");
687
688 Ok(CallToolResult {
689 content: vec![ToolContent::Text { text: result }],
690 is_error: false,
691 })
692 }),
693 )
694 .await;
695
696 self.register_tool(
698 "write_file",
699 "Write content to a file",
700 json!({
701 "type": "object",
702 "properties": {
703 "path": {
704 "type": "string",
705 "description": "Path to the file to write"
706 },
707 "content": {
708 "type": "string",
709 "description": "Content to write"
710 },
711 "create_dirs": {
712 "type": "boolean",
713 "description": "Create parent directories if they don't exist"
714 }
715 },
716 "required": ["path", "content"]
717 }),
718 Arc::new(|args| {
719 let path = args
720 .get("path")
721 .and_then(|v| v.as_str())
722 .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
723
724 let content = args
725 .get("content")
726 .and_then(|v| v.as_str())
727 .ok_or_else(|| anyhow::anyhow!("Missing content"))?;
728
729 let create_dirs = args
730 .get("create_dirs")
731 .and_then(|v| v.as_bool())
732 .unwrap_or(false);
733
734 if create_dirs {
735 if let Some(parent) = std::path::Path::new(path).parent() {
736 std::fs::create_dir_all(parent)?;
737 }
738 }
739
740 std::fs::write(path, content)?;
741
742 Ok(CallToolResult {
743 content: vec![ToolContent::Text {
744 text: format!("Wrote {} bytes to {}", content.len(), path),
745 }],
746 is_error: false,
747 })
748 }),
749 )
750 .await;
751
752 self.register_tool(
754 "list_directory",
755 "List contents of a directory",
756 json!({
757 "type": "object",
758 "properties": {
759 "path": {
760 "type": "string",
761 "description": "Path to the directory"
762 },
763 "recursive": {
764 "type": "boolean",
765 "description": "List recursively"
766 },
767 "max_depth": {
768 "type": "integer",
769 "description": "Maximum depth for recursive listing"
770 }
771 },
772 "required": ["path"]
773 }),
774 Arc::new(|args| {
775 let path = args
776 .get("path")
777 .and_then(|v| v.as_str())
778 .ok_or_else(|| anyhow::anyhow!("Missing path"))?;
779
780 let mut entries = Vec::new();
781 for entry in std::fs::read_dir(path)? {
782 let entry = entry?;
783 let file_type = entry.file_type()?;
784 let name = entry.file_name().to_string_lossy().to_string();
785 let suffix = if file_type.is_dir() { "/" } else { "" };
786 entries.push(format!("{}{}", name, suffix));
787 }
788
789 entries.sort();
790
791 Ok(CallToolResult {
792 content: vec![ToolContent::Text {
793 text: entries.join("\n"),
794 }],
795 is_error: false,
796 })
797 }),
798 )
799 .await;
800
801 self.register_tool(
803 "search_files",
804 "Search for files matching a pattern",
805 json!({
806 "type": "object",
807 "properties": {
808 "pattern": {
809 "type": "string",
810 "description": "Search pattern (glob or regex)"
811 },
812 "path": {
813 "type": "string",
814 "description": "Directory to search in"
815 },
816 "content_pattern": {
817 "type": "string",
818 "description": "Pattern to search in file contents"
819 }
820 },
821 "required": ["pattern"]
822 }),
823 Arc::new(|args| {
824 let pattern = args
825 .get("pattern")
826 .and_then(|v| v.as_str())
827 .ok_or_else(|| anyhow::anyhow!("Missing pattern"))?;
828
829 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
830
831 let output = std::process::Command::new("find")
833 .args([path, "-name", pattern, "-type", "f"])
834 .output()?;
835
836 let result = String::from_utf8_lossy(&output.stdout);
837
838 Ok(CallToolResult {
839 content: vec![ToolContent::Text {
840 text: result.to_string(),
841 }],
842 is_error: !output.status.success(),
843 })
844 }),
845 )
846 .await;
847
848 self.register_tool(
850 "grep_search",
851 "Search file contents using grep",
852 json!({
853 "type": "object",
854 "properties": {
855 "query": {
856 "type": "string",
857 "description": "Search pattern"
858 },
859 "path": {
860 "type": "string",
861 "description": "Directory or file to search"
862 },
863 "is_regex": {
864 "type": "boolean",
865 "description": "Treat pattern as regex"
866 },
867 "case_sensitive": {
868 "type": "boolean",
869 "description": "Case-sensitive search"
870 }
871 },
872 "required": ["query"]
873 }),
874 Arc::new(|args| {
875 let query = args
876 .get("query")
877 .and_then(|v| v.as_str())
878 .ok_or_else(|| anyhow::anyhow!("Missing query"))?;
879
880 let path = args.get("path").and_then(|v| v.as_str()).unwrap_or(".");
881
882 let is_regex = args
883 .get("is_regex")
884 .and_then(|v| v.as_bool())
885 .unwrap_or(false);
886
887 let case_sensitive = args
888 .get("case_sensitive")
889 .and_then(|v| v.as_bool())
890 .unwrap_or(false);
891
892 let mut cmd = std::process::Command::new("grep");
893 cmd.arg("-r").arg("-n");
894
895 if !case_sensitive {
896 cmd.arg("-i");
897 }
898
899 if is_regex {
900 cmd.arg("-E");
901 } else {
902 cmd.arg("-F");
903 }
904
905 cmd.arg(query).arg(path);
906
907 let output = cmd.output()?;
908 let result = String::from_utf8_lossy(&output.stdout);
909
910 Ok(CallToolResult {
911 content: vec![ToolContent::Text {
912 text: result.to_string(),
913 }],
914 is_error: false,
915 })
916 }),
917 )
918 .await;
919
920 info!("Registered {} MCP tools", self.tools.read().await.len());
921 }
922
923 async fn handle_request(&self, request: JsonRpcRequest) -> JsonRpcResponse {
925 debug!(
926 "Handling request: {} (id: {:?})",
927 request.method, request.id
928 );
929
930 let result = match request.method.as_str() {
931 "initialize" => self.handle_initialize(request.params).await,
932 "initialized" => Ok(json!({})),
933 "ping" => Ok(json!({})),
934 "tools/list" => self.handle_list_tools(request.params).await,
935 "tools/call" => self.handle_call_tool(request.params).await,
936 "resources/list" => self.handle_list_resources(request.params).await,
937 "resources/read" => self.handle_read_resource(request.params).await,
938 "prompts/list" => self.handle_list_prompts(request.params).await,
939 "prompts/get" => self.handle_get_prompt(request.params).await,
940 _ => Err(JsonRpcError::method_not_found(&request.method)),
941 };
942
943 match result {
944 Ok(value) => JsonRpcResponse::success(request.id, value),
945 Err(error) => JsonRpcResponse::error(request.id, error),
946 }
947 }
948
949 async fn handle_notification(&self, notification: JsonRpcNotification) {
951 debug!("Handling notification: {}", notification.method);
952
953 match notification.method.as_str() {
954 "notifications/initialized" => {
955 *self.initialized.write().await = true;
956 info!("MCP client initialized");
957 }
958 "notifications/cancelled" => {
959 }
961 _ => {
962 debug!("Unknown notification: {}", notification.method);
963 }
964 }
965 }
966
967 async fn handle_initialize(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
969 let _params: InitializeParams = if let Some(p) = params {
970 serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
971 } else {
972 return Err(JsonRpcError::invalid_params("Missing params"));
973 };
974
975 let result = InitializeResult {
976 protocol_version: PROTOCOL_VERSION.to_string(),
977 capabilities: ServerCapabilities {
978 tools: Some(ToolsCapability { list_changed: true }),
979 resources: Some(ResourcesCapability {
980 subscribe: false,
981 list_changed: true,
982 }),
983 prompts: Some(PromptsCapability { list_changed: true }),
984 logging: Some(LoggingCapability {}),
985 experimental: None,
986 },
987 server_info: self.server_info.clone(),
988 instructions: Some(
989 "CodeTether is an AI coding agent with tools for file operations, \
990 command execution, code search, and autonomous task execution. \
991 Use the swarm tool for complex tasks requiring parallel execution, \
992 and ralph for PRD-driven development."
993 .to_string(),
994 ),
995 };
996
997 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
998 }
999
1000 async fn handle_list_tools(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1002 let metadata = self.metadata.read().await;
1004
1005 let tool_list: Vec<McpTool> = metadata
1006 .values()
1007 .map(|tm| McpTool {
1008 name: tm.name.clone(),
1009 description: tm.description.clone(),
1010 input_schema: tm.input_schema.clone(),
1011 })
1012 .collect();
1013
1014 let result = ListToolsResult {
1015 tools: tool_list,
1016 next_cursor: None,
1017 };
1018
1019 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1020 }
1021
1022 async fn handle_call_tool(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1024 let params: CallToolParams = if let Some(p) = params {
1025 serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1026 } else {
1027 return Err(JsonRpcError::invalid_params("Missing params"));
1028 };
1029
1030 let request_id = uuid::Uuid::new_v4().to_string();
1032 let bus_handle = self.agent_bus.as_ref().map(|bus| bus.handle("mcp-server"));
1033 if let Some(ref bh) = bus_handle {
1034 bh.send(
1035 format!("tools.{}", ¶ms.name),
1036 BusMessage::ToolRequest {
1037 request_id: request_id.clone(),
1038 agent_id: "mcp-server".into(),
1039 tool_name: params.name.clone(),
1040 arguments: params.arguments.clone(),
1041 },
1042 );
1043 }
1044
1045 let tools = self.tools.read().await;
1046 let handler = tools
1047 .get(¶ms.name)
1048 .ok_or_else(|| JsonRpcError::method_not_found(¶ms.name))?;
1049
1050 let (result_value, output_text, success) = match handler(params.arguments) {
1051 Ok(result) => {
1052 let text = result
1053 .content
1054 .iter()
1055 .filter_map(|c| match c {
1056 ToolContent::Text { text } => Some(text.as_str()),
1057 _ => None,
1058 })
1059 .collect::<Vec<_>>()
1060 .join("\n");
1061 let is_err = result.is_error;
1062 let val = serde_json::to_value(result)
1063 .map_err(|e| JsonRpcError::internal_error(e.to_string()))?;
1064 (val, text, !is_err)
1065 }
1066 Err(e) => {
1067 let err_text = e.to_string();
1068 let result = CallToolResult {
1069 content: vec![ToolContent::Text {
1070 text: err_text.clone(),
1071 }],
1072 is_error: true,
1073 };
1074 let val = serde_json::to_value(result)
1075 .map_err(|e| JsonRpcError::internal_error(e.to_string()))?;
1076 (val, err_text, false)
1077 }
1078 };
1079
1080 if let Some(ref bh) = bus_handle {
1082 bh.send(
1083 format!("tools.{}", ¶ms.name),
1084 BusMessage::ToolResponse {
1085 request_id,
1086 agent_id: "mcp-server".into(),
1087 tool_name: params.name,
1088 result: output_text,
1089 success,
1090 },
1091 );
1092 }
1093
1094 Ok(result_value)
1095 }
1096
1097 async fn handle_list_resources(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1099 let metadata = self.resource_metadata.read().await;
1100 let resource_list: Vec<McpResource> = metadata
1101 .values()
1102 .map(|rm| McpResource {
1103 uri: rm.uri.clone(),
1104 name: rm.name.clone(),
1105 description: rm.description.clone(),
1106 mime_type: rm.mime_type.clone(),
1107 })
1108 .collect();
1109
1110 let result = ListResourcesResult {
1111 resources: resource_list,
1112 next_cursor: None,
1113 };
1114
1115 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1116 }
1117
1118 async fn handle_read_resource(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1120 let params: ReadResourceParams = if let Some(p) = params {
1121 serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1122 } else {
1123 return Err(JsonRpcError::invalid_params("Missing params"));
1124 };
1125
1126 let resources = self.resources.read().await;
1127 let handler = resources
1128 .get(¶ms.uri)
1129 .ok_or_else(|| JsonRpcError::method_not_found(¶ms.uri))?;
1130
1131 match handler(params.uri.clone()) {
1132 Ok(result) => serde_json::to_value(result)
1133 .map_err(|e| JsonRpcError::internal_error(e.to_string())),
1134 Err(e) => Err(JsonRpcError::internal_error(e.to_string())),
1135 }
1136 }
1137
1138 async fn handle_list_prompts(&self, _params: Option<Value>) -> Result<Value, JsonRpcError> {
1140 let result = ListPromptsResult {
1141 prompts: vec![
1142 McpPrompt {
1143 name: "code_review".to_string(),
1144 description: Some("Review code for issues and improvements".to_string()),
1145 arguments: vec![PromptArgument {
1146 name: "file".to_string(),
1147 description: Some("File to review".to_string()),
1148 required: true,
1149 }],
1150 },
1151 McpPrompt {
1152 name: "explain_code".to_string(),
1153 description: Some("Explain what code does".to_string()),
1154 arguments: vec![PromptArgument {
1155 name: "file".to_string(),
1156 description: Some("File to explain".to_string()),
1157 required: true,
1158 }],
1159 },
1160 ],
1161 next_cursor: None,
1162 };
1163
1164 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1165 }
1166
1167 async fn handle_get_prompt(&self, params: Option<Value>) -> Result<Value, JsonRpcError> {
1169 let params: GetPromptParams = if let Some(p) = params {
1170 serde_json::from_value(p).map_err(|e| JsonRpcError::invalid_params(e.to_string()))?
1171 } else {
1172 return Err(JsonRpcError::invalid_params("Missing params"));
1173 };
1174
1175 let result = match params.name.as_str() {
1176 "code_review" => {
1177 let file = params
1178 .arguments
1179 .get("file")
1180 .and_then(|v| v.as_str())
1181 .unwrap_or("file.rs");
1182
1183 GetPromptResult {
1184 description: Some("Code review prompt".to_string()),
1185 messages: vec![PromptMessage {
1186 role: PromptRole::User,
1187 content: PromptContent::Text {
1188 text: format!(
1189 "Please review the following code for:\n\
1190 - Bugs and potential issues\n\
1191 - Performance concerns\n\
1192 - Code style and best practices\n\
1193 - Security vulnerabilities\n\n\
1194 File: {}",
1195 file
1196 ),
1197 },
1198 }],
1199 }
1200 }
1201 "explain_code" => {
1202 let file = params
1203 .arguments
1204 .get("file")
1205 .and_then(|v| v.as_str())
1206 .unwrap_or("file.rs");
1207
1208 GetPromptResult {
1209 description: Some("Code explanation prompt".to_string()),
1210 messages: vec![PromptMessage {
1211 role: PromptRole::User,
1212 content: PromptContent::Text {
1213 text: format!(
1214 "Please explain what this code does, including:\n\
1215 - Overall purpose\n\
1216 - Key functions and their roles\n\
1217 - Data flow\n\
1218 - Important algorithms used\n\n\
1219 File: {}",
1220 file
1221 ),
1222 },
1223 }],
1224 }
1225 }
1226 _ => return Err(JsonRpcError::method_not_found(¶ms.name)),
1227 };
1228
1229 serde_json::to_value(result).map_err(|e| JsonRpcError::internal_error(e.to_string()))
1230 }
1231}