mcp_rtk/proxy.rs
1//! MCP proxy server and client handlers.
2//!
3//! This module implements the core proxy logic: [`ProxyServer`] acts as an MCP
4//! server for Claude (served over stdio), forwarding every request to the
5//! upstream MCP server via [`ProxyClient`]. Tool call responses pass through the
6//! [`FilterEngine`] before being returned to Claude, compressing JSON payloads
7//! by 60-90%.
8//!
9//! # Architecture
10//!
11//! ```text
12//! Claude <-(stdio)-> ProxyServer <-(child-process)-> upstream MCP server
13//! | ^
14//! FilterEngine ProxyClient
15//! ```
16//!
17//! The upstream connection is established lazily: the stdio server starts
18//! immediately so Claude Code can complete its MCP handshake, while the
19//! upstream connection is initialized in the background.
20
21use std::sync::Arc;
22
23use arc_swap::ArcSwap;
24use rmcp::handler::client::ClientHandler;
25use rmcp::handler::server::ServerHandler;
26use rmcp::model::*;
27use rmcp::service::{Peer, RequestContext, RoleClient, RoleServer};
28use rmcp::Error as McpError;
29
30use crate::filter::FilterEngine;
31use crate::tracking::Tracker;
32
33/// MCP server handler that proxies requests to an upstream MCP server.
34///
35/// `ProxyServer` intercepts every `call_tool` response and applies the
36/// [`FilterEngine`] pipeline before returning results to Claude, while
37/// `list_tools` responses are forwarded as-is.
38///
39/// The filter engine is held behind an [`ArcSwap`] so it can be atomically
40/// replaced at runtime when external presets change on disk (hot reload).
41///
42/// # Examples
43///
44/// ```no_run
45/// # use std::sync::Arc;
46/// # use arc_swap::ArcSwap;
47/// # use mcp_rtk::config::Config;
48/// # use mcp_rtk::filter::FilterEngine;
49/// # use mcp_rtk::proxy::ProxyServer;
50/// # use rmcp::service::{Peer, RoleClient};
51/// let config = Arc::new(Config::from_upstream(&["npx", "some-mcp"], None).unwrap());
52/// let engine = Arc::new(ArcSwap::from(Arc::new(FilterEngine::new(config))));
53/// // let proxy = ProxyServer::new(engine, None, upstream_peer);
54/// ```
55#[derive(Clone)]
56pub struct ProxyServer {
57 /// Handle to the upstream MCP server.
58 upstream: Peer<RoleClient>,
59 /// The filter engine, atomically swappable for hot reload.
60 filter: Arc<ArcSwap<FilterEngine>>,
61 /// Optional token-savings tracker (SQLite-backed).
62 tracker: Option<Arc<Tracker>>,
63 /// Peer handle for the downstream (Claude) connection.
64 peer: Option<Peer<RoleServer>>,
65}
66
67impl ProxyServer {
68 /// Create a new proxy server with an already-connected upstream peer.
69 ///
70 /// # Arguments
71 ///
72 /// * `engine` — The shared, hot-reloadable filter engine.
73 /// * `tracker` — Optional [`Tracker`] for recording token savings.
74 /// * `upstream` — Peer handle to the upstream MCP server.
75 pub fn new(
76 engine: Arc<ArcSwap<FilterEngine>>,
77 tracker: Option<Arc<Tracker>>,
78 upstream: Peer<RoleClient>,
79 ) -> Self {
80 Self {
81 upstream,
82 filter: engine,
83 tracker,
84 peer: None,
85 }
86 }
87}
88
89impl ServerHandler for ProxyServer {
90 fn get_info(&self) -> ServerInfo {
91 ServerInfo {
92 protocol_version: Default::default(),
93 capabilities: ServerCapabilities {
94 tools: Some(ToolsCapability {
95 list_changed: Some(true),
96 }),
97 ..Default::default()
98 },
99 server_info: Implementation {
100 name: "mcp-rtk".into(),
101 version: env!("CARGO_PKG_VERSION").into(),
102 },
103 instructions: Some("Token-optimizing MCP proxy".into()),
104 }
105 }
106
107 fn get_peer(&self) -> Option<Peer<RoleServer>> {
108 self.peer.clone()
109 }
110
111 fn set_peer(&mut self, peer: Peer<RoleServer>) {
112 self.peer = Some(peer);
113 }
114
115 fn list_tools(
116 &self,
117 request: PaginatedRequestParam,
118 _context: RequestContext<RoleServer>,
119 ) -> impl std::future::Future<Output = Result<ListToolsResult, McpError>> + Send + '_ {
120 let upstream = self.upstream.clone();
121 async move {
122 // Ensure params is always Some — rmcp serializes None as
123 // "params": null which causes some transports to hang.
124 let params = request.or(Some(PaginatedRequestParamInner { cursor: None }));
125 upstream.list_tools(params).await.map_err(|e| {
126 McpError::internal_error(format!("upstream list_tools failed: {e}"), None)
127 })
128 }
129 }
130
131 fn call_tool(
132 &self,
133 request: CallToolRequestParam,
134 _context: RequestContext<RoleServer>,
135 ) -> impl std::future::Future<Output = Result<CallToolResult, McpError>> + Send + '_ {
136 // Load the current engine snapshot (lock-free, zero-copy on the hot path).
137 // If a hot reload swaps the engine mid-flight, this request finishes
138 // with the snapshot it started with.
139 let filter = self.filter.load_full();
140 let tracker = self.tracker.clone();
141 let preset = filter
142 .config()
143 .preset
144 .clone()
145 .unwrap_or_else(|| "generic".to_string());
146 let tool_name = request.name.to_string();
147 let upstream = self.upstream.clone();
148
149 async move {
150 let result = upstream.call_tool(request).await.map_err(|e| {
151 McpError::internal_error(format!("upstream call_tool failed: {e}"), None)
152 })?;
153
154 let filtered_content: Vec<Content> = result
155 .content
156 .into_iter()
157 .map(|content| {
158 filter_content(&filter, &tool_name, content, tracker.as_ref(), &preset)
159 })
160 .collect();
161
162 Ok(CallToolResult {
163 content: filtered_content,
164 is_error: result.is_error,
165 })
166 }
167 }
168}
169
170/// MCP client handler for the upstream server connection.
171///
172/// `ProxyClient` maintains the peer handle to the upstream MCP server. It
173/// implements [`ClientHandler`] with default behavior — no custom notification
174/// handling is needed since all filtering happens on the server side.
175///
176/// # Examples
177///
178/// ```no_run
179/// # use mcp_rtk::proxy::ProxyClient;
180/// let client = ProxyClient::new();
181/// ```
182#[derive(Clone, Default)]
183pub struct ProxyClient {
184 /// Peer handle to the upstream MCP server.
185 peer: Option<Peer<RoleClient>>,
186}
187
188impl ProxyClient {
189 /// Create a new proxy client with no peer connection.
190 pub fn new() -> Self {
191 Self::default()
192 }
193}
194
195impl ClientHandler for ProxyClient {
196 fn get_info(&self) -> ClientInfo {
197 ClientInfo {
198 protocol_version: Default::default(),
199 capabilities: Default::default(),
200 client_info: Implementation {
201 name: "mcp-rtk-client".into(),
202 version: env!("CARGO_PKG_VERSION").into(),
203 },
204 }
205 }
206
207 fn get_peer(&self) -> Option<Peer<RoleClient>> {
208 self.peer.clone()
209 }
210
211 fn set_peer(&mut self, peer: Peer<RoleClient>) {
212 self.peer = Some(peer);
213 }
214}
215
216/// Apply the filter pipeline to a single MCP content block.
217///
218/// Only text content is filtered; images and resources pass through unchanged.
219/// If a [`Tracker`] is provided, the raw and filtered sizes are recorded.
220fn filter_content(
221 filter: &FilterEngine,
222 tool_name: &str,
223 content: Content,
224 tracker: Option<&Arc<Tracker>>,
225 preset: &str,
226) -> Content {
227 match &content.raw {
228 RawContent::Text(text_content) => {
229 let raw = &text_content.text;
230 let filtered = filter.filter(tool_name, raw);
231
232 if let Some(tracker) = tracker {
233 if let Err(e) = tracker.track(tool_name, raw, &filtered, preset) {
234 tracing::warn!("Failed to track tool call: {e}");
235 }
236 }
237
238 let raw_len = raw.len().max(1);
239 tracing::debug!(
240 tool = tool_name,
241 raw_len = raw.len(),
242 filtered_len = filtered.len(),
243 savings_pct = format!(
244 "{:.1}%",
245 (1.0 - filtered.len() as f64 / raw_len as f64) * 100.0
246 ),
247 "Filtered tool response"
248 );
249
250 Annotated {
251 raw: RawContent::Text(RawTextContent { text: filtered }),
252 annotations: content.annotations,
253 }
254 }
255 _ => content,
256 }
257}