synwire_agent/mcp/
in_process.rs1use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use serde_json::Value;
8use tokio::sync::{Mutex, RwLock};
9
10use synwire_core::BoxFuture;
11use synwire_core::agents::error::AgentError;
12use synwire_core::mcp::traits::{
13 McpConnectionState, McpServerStatus, McpToolDescriptor, McpTransport,
14};
15use synwire_core::tools::Tool;
16
17type ToolHandler =
19 Arc<dyn Fn(Value) -> BoxFuture<'static, Result<Value, AgentError>> + Send + Sync>;
20
21pub struct InProcessMcpTransport {
23 name: String,
24 descriptors: Vec<McpToolDescriptor>,
25 handlers: Arc<RwLock<HashMap<String, ToolHandler>>>,
26 state: Mutex<McpConnectionState>,
27 calls_succeeded: AtomicU64,
28 calls_failed: AtomicU64,
29}
30
31impl InProcessMcpTransport {
32 #[must_use]
34 pub fn new(name: impl Into<String>) -> Self {
35 Self {
36 name: name.into(),
37 descriptors: Vec::new(),
38 handlers: Arc::new(RwLock::new(HashMap::new())),
39 state: Mutex::new(McpConnectionState::Disconnected),
40 calls_succeeded: AtomicU64::new(0),
41 calls_failed: AtomicU64::new(0),
42 }
43 }
44
45 pub async fn register<T: Tool + Send + Sync + 'static>(&mut self, tool: Arc<T>) {
47 let descriptor = McpToolDescriptor {
48 name: tool.name().to_string(),
49 description: tool.description().to_string(),
50 input_schema: tool.schema().parameters.clone(),
51 };
52 self.descriptors.push(descriptor.clone());
53
54 let handler: ToolHandler = Arc::new(move |args: Value| {
55 let tool = Arc::clone(&tool);
56 Box::pin(async move {
57 let output = tool
58 .invoke(args)
59 .await
60 .map_err(|e| AgentError::Tool(e.to_string()))?;
61 serde_json::to_value(output)
62 .map_err(|e| AgentError::Tool(format!("serialization failed: {e}")))
63 })
64 });
65
66 let _ = self.handlers.write().await.insert(descriptor.name, handler);
67 }
68}
69
70impl std::fmt::Debug for InProcessMcpTransport {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 f.debug_struct("InProcessMcpTransport")
73 .field("name", &self.name)
74 .field("descriptors", &self.descriptors)
75 .finish_non_exhaustive()
76 }
77}
78
79impl McpTransport for InProcessMcpTransport {
80 fn connect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
81 Box::pin(async move {
82 *self.state.lock().await = McpConnectionState::Connected;
83 tracing::debug!(server = %self.name, "In-process MCP server connected");
84 Ok(())
85 })
86 }
87
88 fn reconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
89 self.connect()
90 }
91
92 fn status(&self) -> BoxFuture<'_, McpServerStatus> {
93 Box::pin(async move {
94 McpServerStatus {
95 name: self.name.clone(),
96 state: *self.state.lock().await,
97 calls_succeeded: self.calls_succeeded.load(Ordering::Relaxed),
98 calls_failed: self.calls_failed.load(Ordering::Relaxed),
99 enabled: true,
100 }
101 })
102 }
103
104 fn list_tools(&self) -> BoxFuture<'_, Result<Vec<McpToolDescriptor>, AgentError>> {
105 let descriptors = self.descriptors.clone();
106 Box::pin(async move { Ok(descriptors) })
107 }
108
109 fn call_tool(
110 &self,
111 tool_name: &str,
112 arguments: Value,
113 ) -> BoxFuture<'_, Result<Value, AgentError>> {
114 let tool_name = tool_name.to_string();
115 let handlers = Arc::clone(&self.handlers);
116 Box::pin(async move {
117 let guard = handlers.read().await;
118 let handler = guard
119 .get(&tool_name)
120 .ok_or_else(|| AgentError::Tool(format!("Unknown in-process tool: {tool_name}")))?;
121 let fut = handler(arguments);
122 drop(guard);
123 let result = fut.await;
124 match &result {
125 Ok(_) => {
126 let _ = self.calls_succeeded.fetch_add(1, Ordering::Relaxed);
127 }
128 Err(_) => {
129 let _ = self.calls_failed.fetch_add(1, Ordering::Relaxed);
130 }
131 }
132 result
133 })
134 }
135
136 fn disconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
137 Box::pin(async move {
138 *self.state.lock().await = McpConnectionState::Shutdown;
139 Ok(())
140 })
141 }
142}