rmcp_mux/
lib.rs

1//! # rmcp_mux - MCP Server Multiplexer
2//!
3//! A library for multiplexing MCP (Model Context Protocol) servers, allowing
4//! a single server process to serve multiple clients via Unix sockets.
5//!
6//! ## Features
7//!
8//! - **Single server, multiple clients**: One MCP server child process serves many clients
9//! - **Initialize caching**: First initialize response is cached for subsequent clients
10//! - **Request ID rewriting**: Transparent request routing with ID collision avoidance
11//! - **Automatic restarts**: Exponential backoff restart of failed server processes
12//! - **Active client limiting**: Semaphore-based concurrency control
13//!
14//! ## Usage as Library
15//!
16//! ```rust,no_run
17//! use rmcp_mux::{MuxConfig, run_mux_server};
18//!
19//! #[tokio::main]
20//! async fn main() -> anyhow::Result<()> {
21//!     let config = MuxConfig::new("/tmp/my-mcp.sock", "npx")
22//!         .with_args(vec!["-y".into(), "@anthropic/mcp-server".into()])
23//!         .with_max_clients(10)
24//!         .with_service_name("my-mcp-server");
25//!
26//!     run_mux_server(config).await
27//! }
28//! ```
29//!
30//! ## Usage with Multiple Mux Instances
31//!
32//! ```rust,no_run
33//! use rmcp_mux::{MuxConfig, spawn_mux_server, MuxHandle};
34//!
35//! #[tokio::main]
36//! async fn main() -> anyhow::Result<()> {
37//!     // Spawn multiple mux servers in a single process
38//!     let handles: Vec<MuxHandle> = vec![
39//!         spawn_mux_server(MuxConfig::new("/tmp/mcp1.sock", "server1")).await?,
40//!         spawn_mux_server(MuxConfig::new("/tmp/mcp2.sock", "server2")).await?,
41//!     ];
42//!
43//!     // Wait for all to complete (or shutdown signal)
44//!     for handle in handles {
45//!         handle.wait().await?;
46//!     }
47//!     Ok(())
48//! }
49//! ```
50
51use std::path::{Path, PathBuf};
52use std::time::Duration;
53
54use anyhow::Result;
55use tokio_util::sync::CancellationToken;
56
57// ─────────────────────────────────────────────────────────────────────────────
58// Public modules
59// ─────────────────────────────────────────────────────────────────────────────
60
61pub mod config;
62pub mod runtime;
63pub mod state;
64
65// CLI-only modules (feature-gated)
66#[cfg(feature = "cli")]
67pub mod scan;
68#[cfg(feature = "tray")]
69pub mod tray;
70#[cfg(feature = "cli")]
71pub mod wizard;
72
73// ─────────────────────────────────────────────────────────────────────────────
74// Re-exports for convenience
75// ─────────────────────────────────────────────────────────────────────────────
76
77pub use config::{CliOptions, Config, ResolvedParams, ServerConfig};
78pub use runtime::{MAX_PENDING, MAX_QUEUE, health_check, run_mux, run_mux_internal, run_proxy};
79pub use state::{MuxState, ServerStatus, StatusSnapshot};
80
81// ─────────────────────────────────────────────────────────────────────────────
82// Library-first configuration builder
83// ─────────────────────────────────────────────────────────────────────────────
84
85/// Configuration for embedding rmcp_mux in your application.
86///
87/// Use the builder pattern to configure the mux server:
88///
89/// ```rust
90/// use rmcp_mux::MuxConfig;
91/// use std::time::Duration;
92///
93/// let config = MuxConfig::new("/tmp/my-mcp.sock", "npx")
94///     .with_args(vec!["-y".into(), "my-mcp-server".into()])
95///     .with_max_clients(10)
96///     .with_request_timeout(Duration::from_secs(60));
97/// ```
98#[derive(Debug, Clone)]
99pub struct MuxConfig {
100    /// Unix socket path for the mux listener
101    pub socket: PathBuf,
102    /// MCP server command (e.g., "npx", "node", "python")
103    pub cmd: String,
104    /// Arguments passed to the MCP server command
105    pub args: Vec<String>,
106    /// Maximum concurrent active clients (default: 5)
107    pub max_clients: usize,
108    /// Service name for logging and status (default: socket filename)
109    pub service_name: Option<String>,
110    /// Log level (default: "info")
111    pub log_level: String,
112    /// Lazy start - only spawn server on first client connect (default: false)
113    pub lazy_start: bool,
114    /// Maximum request size in bytes (default: 1MB)
115    pub max_request_bytes: usize,
116    /// Request timeout before aborting (default: 30s)
117    pub request_timeout: Duration,
118    /// Initial restart backoff (default: 1s)
119    pub restart_backoff: Duration,
120    /// Maximum restart backoff (default: 30s)
121    pub restart_backoff_max: Duration,
122    /// Maximum restarts before marking server failed (0 = unlimited, default: 5)
123    pub max_restarts: u64,
124    /// Optional path to write JSON status snapshots
125    pub status_file: Option<PathBuf>,
126    /// Enable tray icon (only with "tray" feature, default: false)
127    pub tray_enabled: bool,
128}
129
130impl MuxConfig {
131    /// Create a new MuxConfig with required parameters.
132    ///
133    /// # Arguments
134    /// * `socket` - Unix socket path for the mux listener
135    /// * `cmd` - MCP server command to execute
136    pub fn new(socket: impl Into<PathBuf>, cmd: impl Into<String>) -> Self {
137        Self {
138            socket: socket.into(),
139            cmd: cmd.into(),
140            args: Vec::new(),
141            max_clients: 5,
142            service_name: None,
143            log_level: "info".to_string(),
144            lazy_start: false,
145            max_request_bytes: 1_048_576,
146            request_timeout: Duration::from_secs(30),
147            restart_backoff: Duration::from_secs(1),
148            restart_backoff_max: Duration::from_secs(30),
149            max_restarts: 5,
150            status_file: None,
151            tray_enabled: false,
152        }
153    }
154
155    /// Set command arguments.
156    pub fn with_args(mut self, args: Vec<String>) -> Self {
157        self.args = args;
158        self
159    }
160
161    /// Set maximum concurrent clients.
162    pub fn with_max_clients(mut self, max: usize) -> Self {
163        self.max_clients = max;
164        self
165    }
166
167    /// Set service name for logging and status.
168    pub fn with_service_name(mut self, name: impl Into<String>) -> Self {
169        self.service_name = Some(name.into());
170        self
171    }
172
173    /// Set log level (trace, debug, info, warn, error).
174    pub fn with_log_level(mut self, level: impl Into<String>) -> Self {
175        self.log_level = level.into();
176        self
177    }
178
179    /// Enable lazy start (spawn server on first client connect).
180    pub fn with_lazy_start(mut self, lazy: bool) -> Self {
181        self.lazy_start = lazy;
182        self
183    }
184
185    /// Set maximum request size in bytes.
186    pub fn with_max_request_bytes(mut self, bytes: usize) -> Self {
187        self.max_request_bytes = bytes;
188        self
189    }
190
191    /// Set request timeout.
192    pub fn with_request_timeout(mut self, timeout: Duration) -> Self {
193        self.request_timeout = timeout;
194        self
195    }
196
197    /// Set restart backoff parameters.
198    pub fn with_restart_backoff(mut self, initial: Duration, max: Duration) -> Self {
199        self.restart_backoff = initial;
200        self.restart_backoff_max = max;
201        self
202    }
203
204    /// Set maximum restarts (0 = unlimited).
205    pub fn with_max_restarts(mut self, max: u64) -> Self {
206        self.max_restarts = max;
207        self
208    }
209
210    /// Set status file path for JSON snapshots.
211    pub fn with_status_file(mut self, path: impl Into<PathBuf>) -> Self {
212        self.status_file = Some(path.into());
213        self
214    }
215
216    /// Enable tray icon (requires "tray" feature).
217    pub fn with_tray(mut self, enabled: bool) -> Self {
218        self.tray_enabled = enabled;
219        self
220    }
221
222    /// Get the service name (or derive from socket path).
223    pub fn service_name(&self) -> String {
224        self.service_name.clone().unwrap_or_else(|| {
225            self.socket
226                .file_name()
227                .and_then(|n| n.to_string_lossy().split('.').next().map(|s| s.to_string()))
228                .unwrap_or_else(|| "rmcp_mux".to_string())
229        })
230    }
231}
232
233impl From<MuxConfig> for ResolvedParams {
234    fn from(cfg: MuxConfig) -> Self {
235        let service_name = cfg.service_name();
236        ResolvedParams {
237            socket: cfg.socket,
238            cmd: cfg.cmd,
239            args: cfg.args,
240            max_clients: cfg.max_clients,
241            tray_enabled: cfg.tray_enabled,
242            log_level: cfg.log_level,
243            service_name,
244            lazy_start: cfg.lazy_start,
245            max_request_bytes: cfg.max_request_bytes,
246            request_timeout: cfg.request_timeout,
247            restart_backoff: cfg.restart_backoff,
248            restart_backoff_max: cfg.restart_backoff_max,
249            max_restarts: cfg.max_restarts,
250            status_file: cfg.status_file,
251        }
252    }
253}
254
255// ─────────────────────────────────────────────────────────────────────────────
256// Library entry points
257// ─────────────────────────────────────────────────────────────────────────────
258
259/// Run a mux server blocking until shutdown.
260///
261/// This is the simplest way to run a mux server. It blocks until
262/// a shutdown signal (Ctrl+C) is received.
263///
264/// # Example
265/// ```rust,no_run
266/// use rmcp_mux::{MuxConfig, run_mux_server};
267///
268/// #[tokio::main]
269/// async fn main() -> anyhow::Result<()> {
270///     let config = MuxConfig::new("/tmp/my-mcp.sock", "my-server");
271///     run_mux_server(config).await
272/// }
273/// ```
274pub async fn run_mux_server(config: MuxConfig) -> Result<()> {
275    let params: ResolvedParams = config.into();
276    run_mux(params).await
277}
278
279/// Handle for a spawned mux server.
280///
281/// Use this to manage multiple mux servers in a single process.
282pub struct MuxHandle {
283    shutdown: CancellationToken,
284    join_handle: tokio::task::JoinHandle<Result<()>>,
285}
286
287impl MuxHandle {
288    /// Request shutdown of this mux server.
289    pub fn shutdown(&self) {
290        self.shutdown.cancel();
291    }
292
293    /// Wait for the mux server to complete.
294    pub async fn wait(self) -> Result<()> {
295        self.join_handle.await?
296    }
297
298    /// Check if the mux server is still running.
299    pub fn is_running(&self) -> bool {
300        !self.join_handle.is_finished()
301    }
302}
303
304/// Spawn a mux server as a background task.
305///
306/// Returns a handle that can be used to shutdown the server
307/// or wait for it to complete.
308///
309/// # Example
310/// ```rust,no_run
311/// use rmcp_mux::{MuxConfig, spawn_mux_server};
312///
313/// #[tokio::main]
314/// async fn main() -> anyhow::Result<()> {
315///     let handle = spawn_mux_server(MuxConfig::new("/tmp/mcp.sock", "server")).await?;
316///
317///     // Do other work...
318///
319///     // Later, shutdown and wait
320///     handle.shutdown();
321///     handle.wait().await?;
322///     Ok(())
323/// }
324/// ```
325pub async fn spawn_mux_server(config: MuxConfig) -> Result<MuxHandle> {
326    let shutdown = CancellationToken::new();
327    let params: ResolvedParams = config.into();
328
329    let shutdown_clone = shutdown.clone();
330    let join_handle = tokio::spawn(async move {
331        // Override the internal shutdown signal with our token
332        run_mux_with_shutdown(params, shutdown_clone).await
333    });
334
335    Ok(MuxHandle {
336        shutdown,
337        join_handle,
338    })
339}
340
341/// Run mux with external shutdown control.
342///
343/// This is useful for embedding where you want to control shutdown
344/// programmatically rather than via Ctrl+C.
345pub async fn run_mux_with_shutdown(
346    params: ResolvedParams,
347    shutdown: CancellationToken,
348) -> Result<()> {
349    runtime::run_mux_internal(params, shutdown).await
350}
351
352/// Perform a health check on a mux socket.
353///
354/// Returns Ok if the socket is reachable, Err otherwise.
355pub async fn check_health(socket: impl AsRef<Path>) -> Result<()> {
356    let params = ResolvedParams {
357        socket: socket.as_ref().to_path_buf(),
358        cmd: String::new(),
359        args: Vec::new(),
360        max_clients: 1,
361        tray_enabled: false,
362        log_level: "error".to_string(),
363        service_name: "health-check".to_string(),
364        lazy_start: false,
365        max_request_bytes: 0,
366        request_timeout: Duration::from_secs(5),
367        restart_backoff: Duration::from_secs(1),
368        restart_backoff_max: Duration::from_secs(1),
369        max_restarts: 0,
370        status_file: None,
371    };
372    health_check(&params).await
373}
374
375// ─────────────────────────────────────────────────────────────────────────────
376// Version info
377// ─────────────────────────────────────────────────────────────────────────────
378
379/// Library version
380pub const VERSION: &str = env!("CARGO_PKG_VERSION");
381
382/// Library name
383pub const NAME: &str = env!("CARGO_PKG_NAME");