Skip to main content

kvlar_proxy/
stdio.rs

1//! MCP stdio transport implementation.
2//!
3//! Implements a stdio-based proxy that spawns the upstream MCP server as a
4//! child process and communicates via stdin/stdout pipes. This is how
5//! Claude Desktop, Cursor, and other MCP clients invoke tool servers.
6//!
7//! ## Architecture
8//!
9//! ```text
10//! MCP Client (stdin) → Kvlar Proxy → Child Process (stdin) → MCP Server
11//! MCP Client (stdout) ← Kvlar Proxy ← Child Process (stdout) ← MCP Server
12//! ```
13
14use std::process::Stdio;
15use std::sync::Arc;
16
17use kvlar_audit::AuditLogger;
18use kvlar_core::Engine;
19use tokio::io::BufReader;
20use tokio::process::Command;
21use tokio::sync::{Mutex, RwLock};
22
23use crate::approval::ApprovalBackend;
24use crate::handler;
25use crate::shutdown;
26
27/// MCP stdio transport proxy.
28///
29/// Spawns the upstream MCP server as a child process and proxies
30/// MCP messages between the client (our stdin/stdout) and the server
31/// (child process stdin/stdout), applying policy evaluation on tool calls.
32pub struct StdioTransport {
33    engine: Arc<RwLock<Engine>>,
34    audit: Arc<Mutex<AuditLogger>>,
35    command: String,
36    args: Vec<String>,
37    fail_open: bool,
38    approval_backend: Option<Arc<dyn ApprovalBackend>>,
39}
40
41impl StdioTransport {
42    /// Creates a new stdio transport proxy.
43    pub fn new(
44        engine: Engine,
45        audit: AuditLogger,
46        command: String,
47        args: Vec<String>,
48        fail_open: bool,
49    ) -> Self {
50        Self {
51            engine: Arc::new(RwLock::new(engine)),
52            audit: Arc::new(Mutex::new(audit)),
53            command,
54            args,
55            fail_open,
56            approval_backend: None,
57        }
58    }
59
60    /// Creates a new stdio transport proxy with a shared engine reference.
61    ///
62    /// Use this when hot-reload is enabled — the caller retains a clone
63    /// of the `Arc<RwLock<Engine>>` and can swap the engine atomically
64    /// while the proxy is running.
65    pub fn with_shared_engine(
66        engine: Arc<RwLock<Engine>>,
67        audit: AuditLogger,
68        command: String,
69        args: Vec<String>,
70        fail_open: bool,
71    ) -> Self {
72        Self {
73            engine,
74            audit: Arc::new(Mutex::new(audit)),
75            command,
76            args,
77            fail_open,
78            approval_backend: None,
79        }
80    }
81
82    /// Sets the approval backend for handling `RequireApproval` decisions.
83    pub fn with_approval_backend(mut self, backend: Arc<dyn ApprovalBackend>) -> Self {
84        self.approval_backend = Some(backend);
85        self
86    }
87
88    /// Returns a reference to the shared engine (for hot-reload wiring).
89    pub fn engine(&self) -> &Arc<RwLock<Engine>> {
90        &self.engine
91    }
92
93    /// Runs the stdio proxy with graceful shutdown.
94    ///
95    /// Spawns the upstream MCP server as a child process, then proxies
96    /// all MCP messages through the policy engine. This function blocks
97    /// until the client disconnects (stdin EOF), the child process exits,
98    /// or a shutdown signal (SIGTERM/SIGINT) is received.
99    ///
100    /// On shutdown signal:
101    /// 1. Active proxy loop is allowed to drain (up to 30s by default)
102    /// 2. Audit log is flushed
103    /// 3. Upstream child process is terminated cleanly
104    pub async fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
105        tracing::info!(
106            command = %self.command,
107            args = ?self.args,
108            "spawning upstream MCP server"
109        );
110
111        // Install signal handlers — token is cancelled on SIGTERM/SIGINT
112        let shutdown_token = shutdown::signal_shutdown_token();
113
114        // Spawn the upstream MCP server as a child process
115        let mut child = Command::new(&self.command)
116            .args(&self.args)
117            .stdin(Stdio::piped())
118            .stdout(Stdio::piped())
119            .stderr(Stdio::inherit()) // Pass server stderr through to our stderr
120            .spawn()
121            .map_err(|e| format!("failed to spawn upstream command '{}': {}", self.command, e))?;
122
123        let child_stdin = child.stdin.take().ok_or("failed to capture child stdin")?;
124        let child_stdout = child
125            .stdout
126            .take()
127            .ok_or("failed to capture child stdout")?;
128
129        // Our stdin = client reading (MCP client writes to us)
130        let client_reader = BufReader::new(tokio::io::stdin());
131        // Our stdout = client writing (we write responses to MCP client)
132        let client_writer = tokio::io::stdout();
133        // Child stdin = upstream writing (we forward messages to the server)
134        let upstream_writer = child_stdin;
135        // Child stdout = upstream reading (server sends responses to us)
136        let upstream_reader = BufReader::new(child_stdout);
137
138        tracing::info!("stdio proxy running");
139
140        // Run the proxy loop with signal-aware shutdown
141        let proxy_result = tokio::select! {
142            result = handler::run_proxy_loop(
143                client_reader,
144                Arc::new(Mutex::new(client_writer)),
145                upstream_reader,
146                Arc::new(Mutex::new(upstream_writer)),
147                self.engine.clone(),
148                self.audit.clone(),
149                self.fail_open,
150                self.approval_backend.clone(),
151            ) => {
152                tracing::info!("proxy loop ended normally");
153                result
154            }
155            _ = shutdown_token.cancelled() => {
156                tracing::info!("shutdown signal received, draining...");
157                // Give in-flight requests a moment to complete
158                tokio::time::sleep(std::time::Duration::from_millis(500)).await;
159                Ok(())
160            }
161        };
162
163        // Flush audit log
164        {
165            let mut audit = self.audit.lock().await;
166            audit.flush();
167            tracing::info!("audit log flushed");
168        }
169
170        // Clean up: terminate child process gracefully
171        tracing::info!("waiting for child process to exit");
172        let _ = child.kill().await;
173        let exit_status = child.wait().await;
174        match exit_status {
175            Ok(status) => tracing::info!(status = %status, "child process exited"),
176            Err(e) => tracing::warn!(error = %e, "error waiting for child process"),
177        }
178
179        proxy_result.map_err(|e| -> Box<dyn std::error::Error> { e })
180    }
181}