rmcp_mux/lib.rs
1//! # rmcp_mux - MCP Server Multiplexer
2//!
3//! A library for multiplexing MCP (Model Context Protocol) servers, allowing
4//! a single server process to serve multiple clients via Unix sockets.
5//!
6//! ## Features
7//!
8//! - **Single server, multiple clients**: One MCP server child process serves many clients
9//! - **Initialize caching**: First initialize response is cached for subsequent clients
10//! - **Request ID rewriting**: Transparent request routing with ID collision avoidance
11//! - **Automatic restarts**: Exponential backoff restart of failed server processes
12//! - **Active client limiting**: Semaphore-based concurrency control
13//!
14//! ## Usage as Library
15//!
16//! ```rust,no_run
17//! use rmcp_mux::{MuxConfig, run_mux_server};
18//!
19//! #[tokio::main]
20//! async fn main() -> anyhow::Result<()> {
21//! let config = MuxConfig::new("/tmp/my-mcp.sock", "npx")
22//! .with_args(vec!["-y".into(), "@anthropic/mcp-server".into()])
23//! .with_max_clients(10)
24//! .with_service_name("my-mcp-server");
25//!
26//! run_mux_server(config).await
27//! }
28//! ```
29//!
30//! ## Usage with Multiple Mux Instances
31//!
32//! ```rust,no_run
33//! use rmcp_mux::{MuxConfig, spawn_mux_server, MuxHandle};
34//!
35//! #[tokio::main]
36//! async fn main() -> anyhow::Result<()> {
37//! // Spawn multiple mux servers in a single process
38//! let handles: Vec<MuxHandle> = vec![
39//! spawn_mux_server(MuxConfig::new("/tmp/mcp1.sock", "server1")).await?,
40//! spawn_mux_server(MuxConfig::new("/tmp/mcp2.sock", "server2")).await?,
41//! ];
42//!
43//! // Wait for all to complete (or shutdown signal)
44//! for handle in handles {
45//! handle.wait().await?;
46//! }
47//! Ok(())
48//! }
49//! ```
50
51use std::path::{Path, PathBuf};
52use std::time::Duration;
53
54use anyhow::Result;
55use tokio_util::sync::CancellationToken;
56
57// ─────────────────────────────────────────────────────────────────────────────
58// Public modules
59// ─────────────────────────────────────────────────────────────────────────────
60
61pub mod config;
62pub mod runtime;
63pub mod state;
64
65// CLI-only modules (feature-gated)
66#[cfg(feature = "cli")]
67pub mod scan;
68#[cfg(feature = "tray")]
69pub mod tray;
70#[cfg(feature = "cli")]
71pub mod wizard;
72
73// ─────────────────────────────────────────────────────────────────────────────
74// Re-exports for convenience
75// ─────────────────────────────────────────────────────────────────────────────
76
77pub use config::{CliOptions, Config, ResolvedParams, ServerConfig};
78pub use runtime::{MAX_PENDING, MAX_QUEUE, health_check, run_mux, run_mux_internal, run_proxy};
79pub use state::{MuxState, ServerStatus, StatusSnapshot};
80
81// ─────────────────────────────────────────────────────────────────────────────
82// Library-first configuration builder
83// ─────────────────────────────────────────────────────────────────────────────
84
85/// Configuration for embedding rmcp_mux in your application.
86///
87/// Use the builder pattern to configure the mux server:
88///
89/// ```rust
90/// use rmcp_mux::MuxConfig;
91/// use std::time::Duration;
92///
93/// let config = MuxConfig::new("/tmp/my-mcp.sock", "npx")
94/// .with_args(vec!["-y".into(), "my-mcp-server".into()])
95/// .with_max_clients(10)
96/// .with_request_timeout(Duration::from_secs(60));
97/// ```
98#[derive(Debug, Clone)]
99pub struct MuxConfig {
100 /// Unix socket path for the mux listener
101 pub socket: PathBuf,
102 /// MCP server command (e.g., "npx", "node", "python")
103 pub cmd: String,
104 /// Arguments passed to the MCP server command
105 pub args: Vec<String>,
106 /// Maximum concurrent active clients (default: 5)
107 pub max_clients: usize,
108 /// Service name for logging and status (default: socket filename)
109 pub service_name: Option<String>,
110 /// Log level (default: "info")
111 pub log_level: String,
112 /// Lazy start - only spawn server on first client connect (default: false)
113 pub lazy_start: bool,
114 /// Maximum request size in bytes (default: 1MB)
115 pub max_request_bytes: usize,
116 /// Request timeout before aborting (default: 30s)
117 pub request_timeout: Duration,
118 /// Initial restart backoff (default: 1s)
119 pub restart_backoff: Duration,
120 /// Maximum restart backoff (default: 30s)
121 pub restart_backoff_max: Duration,
122 /// Maximum restarts before marking server failed (0 = unlimited, default: 5)
123 pub max_restarts: u64,
124 /// Optional path to write JSON status snapshots
125 pub status_file: Option<PathBuf>,
126 /// Enable tray icon (only with "tray" feature, default: false)
127 pub tray_enabled: bool,
128}
129
130impl MuxConfig {
131 /// Create a new MuxConfig with required parameters.
132 ///
133 /// # Arguments
134 /// * `socket` - Unix socket path for the mux listener
135 /// * `cmd` - MCP server command to execute
136 pub fn new(socket: impl Into<PathBuf>, cmd: impl Into<String>) -> Self {
137 Self {
138 socket: socket.into(),
139 cmd: cmd.into(),
140 args: Vec::new(),
141 max_clients: 5,
142 service_name: None,
143 log_level: "info".to_string(),
144 lazy_start: false,
145 max_request_bytes: 1_048_576,
146 request_timeout: Duration::from_secs(30),
147 restart_backoff: Duration::from_secs(1),
148 restart_backoff_max: Duration::from_secs(30),
149 max_restarts: 5,
150 status_file: None,
151 tray_enabled: false,
152 }
153 }
154
155 /// Set command arguments.
156 pub fn with_args(mut self, args: Vec<String>) -> Self {
157 self.args = args;
158 self
159 }
160
161 /// Set maximum concurrent clients.
162 pub fn with_max_clients(mut self, max: usize) -> Self {
163 self.max_clients = max;
164 self
165 }
166
167 /// Set service name for logging and status.
168 pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
169 self.service_name = Some(name.into());
170 self
171 }
172
173 /// Set log level (trace, debug, info, warn, error).
174 pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
175 self.log_level = level.into();
176 self
177 }
178
179 /// Enable lazy start (spawn server on first client connect).
180 pub fn with_lazy_start(mut self, lazy: bool) -> Self {
181 self.lazy_start = lazy;
182 self
183 }
184
185 /// Set maximum request size in bytes.
186 pub fn with_max_request_bytes(mut self, bytes: usize) -> Self {
187 self.max_request_bytes = bytes;
188 self
189 }
190
191 /// Set request timeout.
192 pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
193 self.request_timeout = timeout;
194 self
195 }
196
197 /// Set restart backoff parameters.
198 pub fn with_restart_backoff(mut self, initial: Duration, max: Duration) -> Self {
199 self.restart_backoff = initial;
200 self.restart_backoff_max = max;
201 self
202 }
203
204 /// Set maximum restarts (0 = unlimited).
205 pub fn with_max_restarts(mut self, max: u64) -> Self {
206 self.max_restarts = max;
207 self
208 }
209
210 /// Set status file path for JSON snapshots.
211 pub fn with_status_file(mut self, path: impl Into<PathBuf>) -> Self {
212 self.status_file = Some(path.into());
213 self
214 }
215
216 /// Enable tray icon (requires "tray" feature).
217 pub fn with_tray(mut self, enabled: bool) -> Self {
218 self.tray_enabled = enabled;
219 self
220 }
221
222 /// Get the service name (or derive from socket path).
223 pub fn service_name(&self) -> String {
224 self.service_name.clone().unwrap_or_else(|| {
225 self.socket
226 .file_name()
227 .and_then(|n| n.to_string_lossy().split('.').next().map(|s| s.to_string()))
228 .unwrap_or_else(|| "rmcp_mux".to_string())
229 })
230 }
231}
232
233impl From<MuxConfig> for ResolvedParams {
234 fn from(cfg: MuxConfig) -> Self {
235 let service_name = cfg.service_name();
236 ResolvedParams {
237 socket: cfg.socket,
238 cmd: cfg.cmd,
239 args: cfg.args,
240 max_clients: cfg.max_clients,
241 tray_enabled: cfg.tray_enabled,
242 log_level: cfg.log_level,
243 service_name,
244 lazy_start: cfg.lazy_start,
245 max_request_bytes: cfg.max_request_bytes,
246 request_timeout: cfg.request_timeout,
247 restart_backoff: cfg.restart_backoff,
248 restart_backoff_max: cfg.restart_backoff_max,
249 max_restarts: cfg.max_restarts,
250 status_file: cfg.status_file,
251 }
252 }
253}
254
255// ─────────────────────────────────────────────────────────────────────────────
256// Library entry points
257// ─────────────────────────────────────────────────────────────────────────────
258
259/// Run a mux server blocking until shutdown.
260///
261/// This is the simplest way to run a mux server. It blocks until
262/// a shutdown signal (Ctrl+C) is received.
263///
264/// # Example
265/// ```rust,no_run
266/// use rmcp_mux::{MuxConfig, run_mux_server};
267///
268/// #[tokio::main]
269/// async fn main() -> anyhow::Result<()> {
270/// let config = MuxConfig::new("/tmp/my-mcp.sock", "my-server");
271/// run_mux_server(config).await
272/// }
273/// ```
274pub async fn run_mux_server(config: MuxConfig) -> Result<()> {
275 let params: ResolvedParams = config.into();
276 run_mux(params).await
277}
278
279/// Handle for a spawned mux server.
280///
281/// Use this to manage multiple mux servers in a single process.
282pub struct MuxHandle {
283 shutdown: CancellationToken,
284 join_handle: tokio::task::JoinHandle<Result<()>>,
285}
286
287impl MuxHandle {
288 /// Request shutdown of this mux server.
289 pub fn shutdown(&self) {
290 self.shutdown.cancel();
291 }
292
293 /// Wait for the mux server to complete.
294 pub async fn wait(self) -> Result<()> {
295 self.join_handle.await?
296 }
297
298 /// Check if the mux server is still running.
299 pub fn is_running(&self) -> bool {
300 !self.join_handle.is_finished()
301 }
302}
303
304/// Spawn a mux server as a background task.
305///
306/// Returns a handle that can be used to shutdown the server
307/// or wait for it to complete.
308///
309/// # Example
310/// ```rust,no_run
311/// use rmcp_mux::{MuxConfig, spawn_mux_server};
312///
313/// #[tokio::main]
314/// async fn main() -> anyhow::Result<()> {
315/// let handle = spawn_mux_server(MuxConfig::new("/tmp/mcp.sock", "server")).await?;
316///
317/// // Do other work...
318///
319/// // Later, shutdown and wait
320/// handle.shutdown();
321/// handle.wait().await?;
322/// Ok(())
323/// }
324/// ```
325pub async fn spawn_mux_server(config: MuxConfig) -> Result<MuxHandle> {
326 let shutdown = CancellationToken::new();
327 let params: ResolvedParams = config.into();
328
329 let shutdown_clone = shutdown.clone();
330 let join_handle = tokio::spawn(async move {
331 // Override the internal shutdown signal with our token
332 run_mux_with_shutdown(params, shutdown_clone).await
333 });
334
335 Ok(MuxHandle {
336 shutdown,
337 join_handle,
338 })
339}
340
341/// Run mux with external shutdown control.
342///
343/// This is useful for embedding where you want to control shutdown
344/// programmatically rather than via Ctrl+C.
345pub async fn run_mux_with_shutdown(
346 params: ResolvedParams,
347 shutdown: CancellationToken,
348) -> Result<()> {
349 runtime::run_mux_internal(params, shutdown).await
350}
351
352/// Perform a health check on a mux socket.
353///
354/// Returns Ok if the socket is reachable, Err otherwise.
355pub async fn check_health(socket: impl AsRef<Path>) -> Result<()> {
356 let params = ResolvedParams {
357 socket: socket.as_ref().to_path_buf(),
358 cmd: String::new(),
359 args: Vec::new(),
360 max_clients: 1,
361 tray_enabled: false,
362 log_level: "error".to_string(),
363 service_name: "health-check".to_string(),
364 lazy_start: false,
365 max_request_bytes: 0,
366 request_timeout: Duration::from_secs(5),
367 restart_backoff: Duration::from_secs(1),
368 restart_backoff_max: Duration::from_secs(1),
369 max_restarts: 0,
370 status_file: None,
371 };
372 health_check(¶ms).await
373}
374
375// ─────────────────────────────────────────────────────────────────────────────
376// Version info
377// ─────────────────────────────────────────────────────────────────────────────
378
379/// Library version
380pub const VERSION: &str = env!("CARGO_PKG_VERSION");
381
382/// Library name
383pub const NAME: &str = env!("CARGO_PKG_NAME");