Skip to main content

mcp_rtk/
proxy.rs

1//! MCP proxy server and client handlers.
2//!
3//! This module implements the core proxy logic: [`ProxyServer`] acts as an MCP
4//! server for Claude (served over stdio), forwarding every request to the
5//! upstream MCP server via [`ProxyClient`]. Tool call responses pass through the
6//! [`FilterEngine`] before being returned to Claude, compressing JSON payloads
7//! by 60-90%.
8//!
9//! # Architecture
10//!
11//! ```text
12//! Claude <-(stdio)-> ProxyServer <-(child-process)-> upstream MCP server
13//!                       |                              ^
14//!                  FilterEngine                   ProxyClient
15//! ```
16//!
17//! The upstream connection is established lazily: the stdio server starts
18//! immediately so Claude Code can complete its MCP handshake, while the
19//! upstream connection is initialized in the background.
20
21use std::sync::Arc;
22
23use arc_swap::ArcSwap;
24use rmcp::handler::client::ClientHandler;
25use rmcp::handler::server::ServerHandler;
26use rmcp::model::*;
27use rmcp::service::{Peer, RequestContext, RoleClient, RoleServer};
28use rmcp::Error as McpError;
29
30use crate::filter::FilterEngine;
31use crate::tracking::Tracker;
32
33/// MCP server handler that proxies requests to an upstream MCP server.
34///
35/// `ProxyServer` intercepts every `call_tool` response and applies the
36/// [`FilterEngine`] pipeline before returning results to Claude, while
37/// `list_tools` responses are forwarded as-is.
38///
39/// The filter engine is held behind an [`ArcSwap`] so it can be atomically
40/// replaced at runtime when external presets change on disk (hot reload).
41///
42/// # Examples
43///
44/// ```no_run
45/// # use std::sync::Arc;
46/// # use arc_swap::ArcSwap;
47/// # use mcp_rtk::config::Config;
48/// # use mcp_rtk::filter::FilterEngine;
49/// # use mcp_rtk::proxy::ProxyServer;
50/// # use rmcp::service::{Peer, RoleClient};
51/// let config = Arc::new(Config::from_upstream(&["npx", "some-mcp"], None).unwrap());
52/// let engine = Arc::new(ArcSwap::from(Arc::new(FilterEngine::new(config))));
53/// // let proxy = ProxyServer::new(engine, None, upstream_peer);
54/// ```
55#[derive(Clone)]
56pub struct ProxyServer {
57    /// Handle to the upstream MCP server.
58    upstream: Peer<RoleClient>,
59    /// The filter engine, atomically swappable for hot reload.
60    filter: Arc<ArcSwap<FilterEngine>>,
61    /// Optional token-savings tracker (SQLite-backed).
62    tracker: Option<Arc<Tracker>>,
63    /// Peer handle for the downstream (Claude) connection.
64    peer: Option<Peer<RoleServer>>,
65}
66
67impl ProxyServer {
68    /// Create a new proxy server with an already-connected upstream peer.
69    ///
70    /// # Arguments
71    ///
72    /// * `engine` — The shared, hot-reloadable filter engine.
73    /// * `tracker` — Optional [`Tracker`] for recording token savings.
74    /// * `upstream` — Peer handle to the upstream MCP server.
75    pub fn new(
76        engine: Arc<ArcSwap<FilterEngine>>,
77        tracker: Option<Arc<Tracker>>,
78        upstream: Peer<RoleClient>,
79    ) -> Self {
80        Self {
81            upstream,
82            filter: engine,
83            tracker,
84            peer: None,
85        }
86    }
87}
88
89impl ServerHandler for ProxyServer {
90    fn get_info(&self) -> ServerInfo {
91        ServerInfo {
92            protocol_version: Default::default(),
93            capabilities: ServerCapabilities {
94                tools: Some(ToolsCapability {
95                    list_changed: Some(true),
96                }),
97                ..Default::default()
98            },
99            server_info: Implementation {
100                name: "mcp-rtk".into(),
101                version: env!("CARGO_PKG_VERSION").into(),
102            },
103            instructions: Some("Token-optimizing MCP proxy".into()),
104        }
105    }
106
107    fn get_peer(&self) -> Option<Peer<RoleServer>> {
108        self.peer.clone()
109    }
110
111    fn set_peer(&mut self, peer: Peer<RoleServer>) {
112        self.peer = Some(peer);
113    }
114
115    fn list_tools(
116        &self,
117        request: PaginatedRequestParam,
118        _context: RequestContext<RoleServer>,
119    ) -> impl std::future::Future<Output = Result<ListToolsResult, McpError>> + Send + '_ {
120        let upstream = self.upstream.clone();
121        async move {
122            // Ensure params is always Some — rmcp serializes None as
123            // "params": null which causes some transports to hang.
124            let params = request.or(Some(PaginatedRequestParamInner { cursor: None }));
125            upstream.list_tools(params).await.map_err(|e| {
126                McpError::internal_error(format!("upstream list_tools failed: {e}"), None)
127            })
128        }
129    }
130
131    fn call_tool(
132        &self,
133        request: CallToolRequestParam,
134        _context: RequestContext<RoleServer>,
135    ) -> impl std::future::Future<Output = Result<CallToolResult, McpError>> + Send + '_ {
136        // Load the current engine snapshot (lock-free, zero-copy on the hot path).
137        // If a hot reload swaps the engine mid-flight, this request finishes
138        // with the snapshot it started with.
139        let filter = self.filter.load_full();
140        let tracker = self.tracker.clone();
141        let preset = filter
142            .config()
143            .preset
144            .clone()
145            .unwrap_or_else(|| "generic".to_string());
146        let tool_name = request.name.to_string();
147        let upstream = self.upstream.clone();
148
149        async move {
150            let result = upstream.call_tool(request).await.map_err(|e| {
151                McpError::internal_error(format!("upstream call_tool failed: {e}"), None)
152            })?;
153
154            let filtered_content: Vec<Content> = result
155                .content
156                .into_iter()
157                .map(|content| {
158                    filter_content(&filter, &tool_name, content, tracker.as_ref(), &preset)
159                })
160                .collect();
161
162            Ok(CallToolResult {
163                content: filtered_content,
164                is_error: result.is_error,
165            })
166        }
167    }
168}
169
170/// MCP client handler for the upstream server connection.
171///
172/// `ProxyClient` maintains the peer handle to the upstream MCP server. It
173/// implements [`ClientHandler`] with default behavior — no custom notification
174/// handling is needed since all filtering happens on the server side.
175///
176/// # Examples
177///
178/// ```no_run
179/// # use mcp_rtk::proxy::ProxyClient;
180/// let client = ProxyClient::new();
181/// ```
182#[derive(Clone, Default)]
183pub struct ProxyClient {
184    /// Peer handle to the upstream MCP server.
185    peer: Option<Peer<RoleClient>>,
186}
187
188impl ProxyClient {
189    /// Create a new proxy client with no peer connection.
190    pub fn new() -> Self {
191        Self::default()
192    }
193}
194
195impl ClientHandler for ProxyClient {
196    fn get_info(&self) -> ClientInfo {
197        ClientInfo {
198            protocol_version: Default::default(),
199            capabilities: Default::default(),
200            client_info: Implementation {
201                name: "mcp-rtk-client".into(),
202                version: env!("CARGO_PKG_VERSION").into(),
203            },
204        }
205    }
206
207    fn get_peer(&self) -> Option<Peer<RoleClient>> {
208        self.peer.clone()
209    }
210
211    fn set_peer(&mut self, peer: Peer<RoleClient>) {
212        self.peer = Some(peer);
213    }
214}
215
216/// Apply the filter pipeline to a single MCP content block.
217///
218/// Only text content is filtered; images and resources pass through unchanged.
219/// If a [`Tracker`] is provided, the raw and filtered sizes are recorded.
220fn filter_content(
221    filter: &FilterEngine,
222    tool_name: &str,
223    content: Content,
224    tracker: Option<&Arc<Tracker>>,
225    preset: &str,
226) -> Content {
227    match &content.raw {
228        RawContent::Text(text_content) => {
229            let raw = &text_content.text;
230            let filtered = filter.filter(tool_name, raw);
231
232            if let Some(tracker) = tracker {
233                if let Err(e) = tracker.track(tool_name, raw, &filtered, preset) {
234                    tracing::warn!("Failed to track tool call: {e}");
235                }
236            }
237
238            let raw_len = raw.len().max(1);
239            tracing::debug!(
240                tool = tool_name,
241                raw_len = raw.len(),
242                filtered_len = filtered.len(),
243                savings_pct = format!(
244                    "{:.1}%",
245                    (1.0 - filtered.len() as f64 / raw_len as f64) * 100.0
246                ),
247                "Filtered tool response"
248            );
249
250            Annotated {
251                raw: RawContent::Text(RawTextContent { text: filtered }),
252                annotations: content.annotations,
253            }
254        }
255        _ => content,
256    }
257}