Skip to main content

synwire_agent/mcp/
in_process.rs

1//! In-process MCP server created from native tool definitions.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::sync::atomic::{AtomicU64, Ordering};
6
7use serde_json::Value;
8use tokio::sync::{Mutex, RwLock};
9
10use synwire_core::BoxFuture;
11use synwire_core::agents::error::AgentError;
12use synwire_core::mcp::traits::{
13    McpConnectionState, McpServerStatus, McpToolDescriptor, McpTransport,
14};
15use synwire_core::tools::Tool;
16
17/// Handler function stored per tool name.
18type ToolHandler =
19    Arc<dyn Fn(Value) -> BoxFuture<'static, Result<Value, AgentError>> + Send + Sync>;
20
21/// In-process MCP server that dispatches calls to registered `Tool` objects.
22pub struct InProcessMcpTransport {
23    name: String,
24    descriptors: Vec<McpToolDescriptor>,
25    handlers: Arc<RwLock<HashMap<String, ToolHandler>>>,
26    state: Mutex<McpConnectionState>,
27    calls_succeeded: AtomicU64,
28    calls_failed: AtomicU64,
29}
30
31impl InProcessMcpTransport {
32    /// Create a new in-process transport, registering all provided tools.
33    #[must_use]
34    pub fn new(name: impl Into<String>) -> Self {
35        Self {
36            name: name.into(),
37            descriptors: Vec::new(),
38            handlers: Arc::new(RwLock::new(HashMap::new())),
39            state: Mutex::new(McpConnectionState::Disconnected),
40            calls_succeeded: AtomicU64::new(0),
41            calls_failed: AtomicU64::new(0),
42        }
43    }
44
45    /// Register a tool with this in-process server.
46    pub async fn register<T: Tool + Send + Sync + 'static>(&mut self, tool: Arc<T>) {
47        let descriptor = McpToolDescriptor {
48            name: tool.name().to_string(),
49            description: tool.description().to_string(),
50            input_schema: tool.schema().parameters.clone(),
51        };
52        self.descriptors.push(descriptor.clone());
53
54        let handler: ToolHandler = Arc::new(move |args: Value| {
55            let tool = Arc::clone(&tool);
56            Box::pin(async move {
57                let output = tool
58                    .invoke(args)
59                    .await
60                    .map_err(|e| AgentError::Tool(e.to_string()))?;
61                serde_json::to_value(output)
62                    .map_err(|e| AgentError::Tool(format!("serialization failed: {e}")))
63            })
64        });
65
66        let _ = self.handlers.write().await.insert(descriptor.name, handler);
67    }
68}
69
70impl std::fmt::Debug for InProcessMcpTransport {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        f.debug_struct("InProcessMcpTransport")
73            .field("name", &self.name)
74            .field("descriptors", &self.descriptors)
75            .finish_non_exhaustive()
76    }
77}
78
79impl McpTransport for InProcessMcpTransport {
80    fn connect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
81        Box::pin(async move {
82            *self.state.lock().await = McpConnectionState::Connected;
83            tracing::debug!(server = %self.name, "In-process MCP server connected");
84            Ok(())
85        })
86    }
87
88    fn reconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
89        self.connect()
90    }
91
92    fn status(&self) -> BoxFuture<'_, McpServerStatus> {
93        Box::pin(async move {
94            McpServerStatus {
95                name: self.name.clone(),
96                state: *self.state.lock().await,
97                calls_succeeded: self.calls_succeeded.load(Ordering::Relaxed),
98                calls_failed: self.calls_failed.load(Ordering::Relaxed),
99                enabled: true,
100            }
101        })
102    }
103
104    fn list_tools(&self) -> BoxFuture<'_, Result<Vec<McpToolDescriptor>, AgentError>> {
105        let descriptors = self.descriptors.clone();
106        Box::pin(async move { Ok(descriptors) })
107    }
108
109    fn call_tool(
110        &self,
111        tool_name: &str,
112        arguments: Value,
113    ) -> BoxFuture<'_, Result<Value, AgentError>> {
114        let tool_name = tool_name.to_string();
115        let handlers = Arc::clone(&self.handlers);
116        Box::pin(async move {
117            let guard = handlers.read().await;
118            let handler = guard
119                .get(&tool_name)
120                .ok_or_else(|| AgentError::Tool(format!("Unknown in-process tool: {tool_name}")))?;
121            let fut = handler(arguments);
122            drop(guard);
123            let result = fut.await;
124            match &result {
125                Ok(_) => {
126                    let _ = self.calls_succeeded.fetch_add(1, Ordering::Relaxed);
127                }
128                Err(_) => {
129                    let _ = self.calls_failed.fetch_add(1, Ordering::Relaxed);
130                }
131            }
132            result
133        })
134    }
135
136    fn disconnect(&self) -> BoxFuture<'_, Result<(), AgentError>> {
137        Box::pin(async move {
138            *self.state.lock().await = McpConnectionState::Shutdown;
139            Ok(())
140        })
141    }
142}