kvlar_proxy/stdio.rs
1//! MCP stdio transport implementation.
2//!
3//! Implements a stdio-based proxy that spawns the upstream MCP server as a
4//! child process and communicates via stdin/stdout pipes. This is how
5//! Claude Desktop, Cursor, and other MCP clients invoke tool servers.
6//!
7//! ## Architecture
8//!
9//! ```text
10//! MCP Client (stdin) → Kvlar Proxy → Child Process (stdin) → MCP Server
11//! MCP Client (stdout) ← Kvlar Proxy ← Child Process (stdout) ← MCP Server
12//! ```
13
14use std::process::Stdio;
15use std::sync::Arc;
16
17use kvlar_audit::AuditLogger;
18use kvlar_core::Engine;
19use tokio::io::BufReader;
20use tokio::process::Command;
21use tokio::sync::{Mutex, RwLock};
22
23use crate::approval::ApprovalBackend;
24use crate::handler;
25use crate::shutdown;
26
27/// MCP stdio transport proxy.
28///
29/// Spawns the upstream MCP server as a child process and proxies
30/// MCP messages between the client (our stdin/stdout) and the server
31/// (child process stdin/stdout), applying policy evaluation on tool calls.
32pub struct StdioTransport {
33 engine: Arc<RwLock<Engine>>,
34 audit: Arc<Mutex<AuditLogger>>,
35 command: String,
36 args: Vec<String>,
37 fail_open: bool,
38 approval_backend: Option<Arc<dyn ApprovalBackend>>,
39}
40
41impl StdioTransport {
42 /// Creates a new stdio transport proxy.
43 pub fn new(
44 engine: Engine,
45 audit: AuditLogger,
46 command: String,
47 args: Vec<String>,
48 fail_open: bool,
49 ) -> Self {
50 Self {
51 engine: Arc::new(RwLock::new(engine)),
52 audit: Arc::new(Mutex::new(audit)),
53 command,
54 args,
55 fail_open,
56 approval_backend: None,
57 }
58 }
59
60 /// Creates a new stdio transport proxy with a shared engine reference.
61 ///
62 /// Use this when hot-reload is enabled — the caller retains a clone
63 /// of the `Arc<RwLock<Engine>>` and can swap the engine atomically
64 /// while the proxy is running.
65 pub fn with_shared_engine(
66 engine: Arc<RwLock<Engine>>,
67 audit: AuditLogger,
68 command: String,
69 args: Vec<String>,
70 fail_open: bool,
71 ) -> Self {
72 Self {
73 engine,
74 audit: Arc::new(Mutex::new(audit)),
75 command,
76 args,
77 fail_open,
78 approval_backend: None,
79 }
80 }
81
82 /// Sets the approval backend for handling `RequireApproval` decisions.
83 pub fn with_approval_backend(mut self, backend: Arc<dyn ApprovalBackend>) -> Self {
84 self.approval_backend = Some(backend);
85 self
86 }
87
88 /// Returns a reference to the shared engine (for hot-reload wiring).
89 pub fn engine(&self) -> &Arc<RwLock<Engine>> {
90 &self.engine
91 }
92
93 /// Runs the stdio proxy with graceful shutdown.
94 ///
95 /// Spawns the upstream MCP server as a child process, then proxies
96 /// all MCP messages through the policy engine. This function blocks
97 /// until the client disconnects (stdin EOF), the child process exits,
98 /// or a shutdown signal (SIGTERM/SIGINT) is received.
99 ///
100 /// On shutdown signal:
101 /// 1. Active proxy loop is allowed to drain (up to 30s by default)
102 /// 2. Audit log is flushed
103 /// 3. Upstream child process is terminated cleanly
104 pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
105 tracing::info!(
106 command = %self.command,
107 args = ?self.args,
108 "spawning upstream MCP server"
109 );
110
111 // Install signal handlers — token is cancelled on SIGTERM/SIGINT
112 let shutdown_token = shutdown::signal_shutdown_token();
113
114 // Spawn the upstream MCP server as a child process
115 let mut child = Command::new(&self.command)
116 .args(&self.args)
117 .stdin(Stdio::piped())
118 .stdout(Stdio::piped())
119 .stderr(Stdio::inherit()) // Pass server stderr through to our stderr
120 .spawn()
121 .map_err(|e| format!("failed to spawn upstream command '{}': {}", self.command, e))?;
122
123 let child_stdin = child.stdin.take().ok_or("failed to capture child stdin")?;
124 let child_stdout = child
125 .stdout
126 .take()
127 .ok_or("failed to capture child stdout")?;
128
129 // Our stdin = client reading (MCP client writes to us)
130 let client_reader = BufReader::new(tokio::io::stdin());
131 // Our stdout = client writing (we write responses to MCP client)
132 let client_writer = tokio::io::stdout();
133 // Child stdin = upstream writing (we forward messages to the server)
134 let upstream_writer = child_stdin;
135 // Child stdout = upstream reading (server sends responses to us)
136 let upstream_reader = BufReader::new(child_stdout);
137
138 tracing::info!("stdio proxy running");
139
140 // Run the proxy loop with signal-aware shutdown
141 let proxy_result = tokio::select! {
142 result = handler::run_proxy_loop(
143 client_reader,
144 Arc::new(Mutex::new(client_writer)),
145 upstream_reader,
146 Arc::new(Mutex::new(upstream_writer)),
147 self.engine.clone(),
148 self.audit.clone(),
149 self.fail_open,
150 self.approval_backend.clone(),
151 ) => {
152 tracing::info!("proxy loop ended normally");
153 result
154 }
155 _ = shutdown_token.cancelled() => {
156 tracing::info!("shutdown signal received, draining...");
157 // Give in-flight requests a moment to complete
158 tokio::time::sleep(std::time::Duration::from_millis(500)).await;
159 Ok(())
160 }
161 };
162
163 // Flush audit log
164 {
165 let mut audit = self.audit.lock().await;
166 audit.flush();
167 tracing::info!("audit log flushed");
168 }
169
170 // Clean up: terminate child process gracefully
171 tracing::info!("waiting for child process to exit");
172 let _ = child.kill().await;
173 let exit_status = child.wait().await;
174 match exit_status {
175 Ok(status) => tracing::info!(status = %status, "child process exited"),
176 Err(e) => tracing::warn!(error = %e, "error waiting for child process"),
177 }
178
179 proxy_result.map_err(|e| -> Box<dyn std::error::Error> { e })
180 }
181}