Skip to main content

fastmcp_client/
builder.rs

1//! Client builder for configuring MCP clients.
2//!
3//! The builder provides a fluent API for constructing MCP clients with
4//! customizable timeout, retry, and subprocess spawn options.
5//!
6//! # Example
7//!
8//! ```ignore
9//! use fastmcp_rust::ClientBuilder;
10//!
11//! let client = ClientBuilder::new()
12//!     .client_info("my-client", "1.0.0")
13//!     .timeout_ms(60_000)
14//!     .max_retries(3)
15//!     .retry_delay_ms(1000)
16//!     .working_dir("/tmp")
17//!     .env("DEBUG", "1")
18//!     .connect_stdio("uvx", &["my-server"])?;
19//! ```
20
21use std::collections::HashMap;
22use std::path::PathBuf;
23use std::process::{Child, Command, Stdio};
24use std::time::Duration;
25
26/// Guard that kills and waits for a child process when dropped.
27/// Call `disarm()` to prevent cleanup (e.g., when ownership transfers to Client).
28struct ChildGuard(Option<Child>);
29
30impl ChildGuard {
31    fn new(child: Child) -> Self {
32        Self(Some(child))
33    }
34
35    /// Takes ownership of the child, preventing cleanup on drop.
36    fn disarm(mut self) -> Child {
37        self.0.take().expect("ChildGuard already disarmed")
38    }
39}
40
41impl Drop for ChildGuard {
42    fn drop(&mut self) {
43        if let Some(mut child) = self.0.take() {
44            // Best effort cleanup - ignore errors
45            let _ = child.kill();
46            let _ = child.wait();
47        }
48    }
49}
50
51use asupersync::Cx;
52use fastmcp_core::{McpError, McpResult};
53use fastmcp_protocol::{
54    ClientCapabilities, ClientInfo, InitializeParams, InitializeResult, JsonRpcMessage,
55    JsonRpcRequest, PROTOCOL_VERSION,
56};
57use fastmcp_transport::{StdioTransport, Transport};
58
59use crate::{Client, ClientSession};
60
61/// Builder for configuring an MCP client.
62///
63/// Use this to configure timeout, retry, and spawn options before
64/// connecting to an MCP server.
65#[derive(Debug, Clone)]
66pub struct ClientBuilder {
67    /// Client identification info.
68    client_info: ClientInfo,
69    /// Request timeout in milliseconds.
70    timeout_ms: u64,
71    /// Maximum number of connection retries.
72    max_retries: u32,
73    /// Delay between retries in milliseconds.
74    retry_delay_ms: u64,
75    /// Working directory for subprocess.
76    working_dir: Option<PathBuf>,
77    /// Environment variables to set for subprocess.
78    env_vars: HashMap<String, String>,
79    /// Whether to inherit parent's environment.
80    inherit_env: bool,
81    /// Client capabilities to advertise.
82    capabilities: ClientCapabilities,
83    /// Whether to defer initialization until first use.
84    auto_initialize: bool,
85}
86
87impl ClientBuilder {
88    /// Creates a new client builder with default settings.
89    ///
90    /// Default configuration:
91    /// - Client name: "fastmcp-client"
92    /// - Timeout: 30 seconds
93    /// - Max retries: 0 (no retries)
94    /// - Retry delay: 1 second
95    /// - Inherit environment: true
96    /// - Auto-initialize: false (initialize immediately on connect)
97    #[must_use]
98    pub fn new() -> Self {
99        Self {
100            client_info: ClientInfo {
101                name: "fastmcp-client".to_owned(),
102                version: env!("CARGO_PKG_VERSION").to_owned(),
103            },
104            timeout_ms: 30_000,
105            max_retries: 0,
106            retry_delay_ms: 1_000,
107            working_dir: None,
108            env_vars: HashMap::new(),
109            inherit_env: true,
110            capabilities: ClientCapabilities::default(),
111            auto_initialize: false,
112        }
113    }
114
115    /// Sets the client name and version.
116    ///
117    /// This information is sent to the server during initialization.
118    #[must_use]
119    pub fn client_info(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
120        self.client_info = ClientInfo {
121            name: name.into(),
122            version: version.into(),
123        };
124        self
125    }
126
127    /// Sets the request timeout in milliseconds.
128    ///
129    /// This affects how long the client waits for responses from the server.
130    /// Default is 30,000ms (30 seconds).
131    #[must_use]
132    pub fn timeout_ms(mut self, timeout: u64) -> Self {
133        self.timeout_ms = timeout;
134        self
135    }
136
137    /// Sets the maximum number of connection retries.
138    ///
139    /// When connecting to a server fails, the client will retry up to
140    /// this many times before returning an error. Default is 0 (no retries).
141    #[must_use]
142    pub fn max_retries(mut self, retries: u32) -> Self {
143        self.max_retries = retries;
144        self
145    }
146
147    /// Sets the delay between connection retries in milliseconds.
148    ///
149    /// Default is 1,000ms (1 second).
150    #[must_use]
151    pub fn retry_delay_ms(mut self, delay: u64) -> Self {
152        self.retry_delay_ms = delay;
153        self
154    }
155
156    /// Sets the working directory for the subprocess.
157    ///
158    /// If not set, the subprocess inherits the current working directory.
159    #[must_use]
160    pub fn working_dir(mut self, path: impl Into<PathBuf>) -> Self {
161        self.working_dir = Some(path.into());
162        self
163    }
164
165    /// Adds an environment variable for the subprocess.
166    ///
167    /// Multiple calls to this method accumulate environment variables.
168    #[must_use]
169    pub fn env(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
170        self.env_vars.insert(key.into(), value.into());
171        self
172    }
173
174    /// Adds multiple environment variables for the subprocess.
175    #[must_use]
176    pub fn envs<I, K, V>(mut self, vars: I) -> Self
177    where
178        I: IntoIterator<Item = (K, V)>,
179        K: Into<String>,
180        V: Into<String>,
181    {
182        for (key, value) in vars {
183            self.env_vars.insert(key.into(), value.into());
184        }
185        self
186    }
187
188    /// Sets whether to inherit the parent process's environment.
189    ///
190    /// If true (default), the subprocess starts with the parent's environment
191    /// plus any variables added via [`env`](Self::env) or [`envs`](Self::envs).
192    ///
193    /// If false, the subprocess starts with only the explicitly set variables.
194    #[must_use]
195    pub fn inherit_env(mut self, inherit: bool) -> Self {
196        self.inherit_env = inherit;
197        self
198    }
199
200    /// Sets the client capabilities to advertise to the server.
201    #[must_use]
202    pub fn capabilities(mut self, capabilities: ClientCapabilities) -> Self {
203        self.capabilities = capabilities;
204        self
205    }
206
207    /// Enables auto-initialization mode.
208    ///
209    /// When enabled, the client defers the MCP initialization handshake until
210    /// the first method call (e.g., `list_tools`, `call_tool`). This allows
211    /// the subprocess to start immediately without blocking on initialization.
212    ///
213    /// Default is `false` (initialize immediately on connect).
214    ///
215    /// # Example
216    ///
217    /// ```ignore
218    /// let client = ClientBuilder::new()
219    ///     .auto_initialize(true)
220    ///     .connect_stdio("uvx", &["my-server"])?;
221    ///
222    /// // Subprocess is running but not yet initialized
223    /// // Initialization happens on first use:
224    /// let tools = client.list_tools()?; // Initializes here
225    /// ```
226    #[must_use]
227    pub fn auto_initialize(mut self, enabled: bool) -> Self {
228        self.auto_initialize = enabled;
229        self
230    }
231
232    /// Connects to a server via stdio subprocess.
233    ///
234    /// Spawns the specified command as a subprocess and communicates via
235    /// stdin/stdout using JSON-RPC over NDJSON framing.
236    ///
237    /// # Arguments
238    ///
239    /// * `command` - The command to run (e.g., "uvx", "npx")
240    /// * `args` - Arguments to pass to the command
241    ///
242    /// # Errors
243    ///
244    /// Returns an error if:
245    /// - The subprocess fails to spawn
246    /// - The initialization handshake fails
247    /// - All retry attempts are exhausted
248    pub fn connect_stdio(self, command: &str, args: &[&str]) -> McpResult<Client> {
249        self.connect_stdio_with_cx(command, args, &Cx::for_request())
250    }
251
252    /// Connects to a server via stdio subprocess with a provided Cx.
253    ///
254    /// Same as [`connect_stdio`](Self::connect_stdio) but allows providing
255    /// a custom capability context for cancellation support.
256    pub fn connect_stdio_with_cx(self, command: &str, args: &[&str], cx: &Cx) -> McpResult<Client> {
257        let mut last_error = None;
258        // Compute attempts in u64 to avoid overflow when max_retries == u32::MAX.
259        let attempts = u64::from(self.max_retries) + 1;
260
261        for attempt in 0..attempts {
262            // Honor cancellation/budget before each attempt.
263            if cx.checkpoint().is_err() {
264                return Err(McpError::request_cancelled());
265            }
266
267            if attempt > 0 {
268                // Delay before retry while still observing cancellation.
269                // Slice sleeps so cancellation is detected promptly even for long delays.
270                let mut remaining_ms = self.retry_delay_ms;
271                while remaining_ms > 0 {
272                    if cx.checkpoint().is_err() {
273                        return Err(McpError::request_cancelled());
274                    }
275
276                    let sleep_ms = remaining_ms.min(25);
277                    std::thread::sleep(Duration::from_millis(sleep_ms));
278                    remaining_ms = remaining_ms.saturating_sub(sleep_ms);
279                }
280            }
281
282            match self.try_connect(command, args, cx) {
283                Ok(client) => return Ok(client),
284                Err(e) => {
285                    last_error = Some(e);
286                }
287            }
288        }
289
290        // All attempts failed
291        Err(last_error.unwrap_or_else(|| McpError::internal_error("Connection failed")))
292    }
293
294    /// Attempts a single connection.
295    fn try_connect(&self, command: &str, args: &[&str], cx: &Cx) -> McpResult<Client> {
296        // Build the command
297        let mut cmd = Command::new(command);
298        cmd.args(args)
299            .stdin(Stdio::piped())
300            .stdout(Stdio::piped())
301            .stderr(Stdio::inherit());
302
303        // Set working directory if specified
304        if let Some(ref dir) = self.working_dir {
305            cmd.current_dir(dir);
306        }
307
308        // Set environment
309        if !self.inherit_env {
310            cmd.env_clear();
311        }
312        for (key, value) in &self.env_vars {
313            cmd.env(key, value);
314        }
315
316        // Spawn the subprocess
317        let mut child = cmd
318            .spawn()
319            .map_err(|e| McpError::internal_error(format!("Failed to spawn subprocess: {e}")))?;
320
321        // Get stdin/stdout handles
322        let stdin = child
323            .stdin
324            .take()
325            .ok_or_else(|| McpError::internal_error("Failed to get subprocess stdin"))?;
326        let stdout = child
327            .stdout
328            .take()
329            .ok_or_else(|| McpError::internal_error("Failed to get subprocess stdout"))?;
330
331        // Create transport
332        let transport = StdioTransport::new(stdout, stdin);
333
334        if self.auto_initialize {
335            // Create uninitialized client - initialization will happen on first use
336            Ok(self.create_uninitialized_client(child, transport, cx))
337        } else {
338            // Perform initialization immediately
339            self.initialize_client(child, transport, cx)
340        }
341    }
342
343    /// Creates an uninitialized client for auto-initialize mode.
344    fn create_uninitialized_client(
345        &self,
346        child: Child,
347        transport: StdioTransport<std::process::ChildStdout, std::process::ChildStdin>,
348        cx: &Cx,
349    ) -> Client {
350        // Create a placeholder session - will be updated on first use
351        let session = ClientSession::new(
352            self.client_info.clone(),
353            self.capabilities.clone(),
354            fastmcp_protocol::ServerInfo {
355                name: String::new(),
356                version: String::new(),
357            },
358            fastmcp_protocol::ServerCapabilities::default(),
359            String::new(),
360        );
361
362        Client::from_parts_uninitialized(child, transport, cx.clone(), session, self.timeout_ms)
363    }
364
365    /// Performs the initialization handshake and creates the client.
366    fn initialize_client(
367        &self,
368        child: Child,
369        mut transport: StdioTransport<std::process::ChildStdout, std::process::ChildStdin>,
370        cx: &Cx,
371    ) -> McpResult<Client> {
372        // Guard ensures child process is killed if initialization fails.
373        // Disarmed when client is successfully created.
374        let child_guard = ChildGuard::new(child);
375
376        // Send initialize request
377        let init_params = InitializeParams {
378            protocol_version: PROTOCOL_VERSION.to_string(),
379            capabilities: self.capabilities.clone(),
380            client_info: self.client_info.clone(),
381        };
382
383        let init_request = JsonRpcRequest::new(
384            "initialize",
385            Some(serde_json::to_value(&init_params).map_err(|e| {
386                McpError::internal_error(format!("Failed to serialize params: {e}"))
387            })?),
388            1i64,
389        );
390
391        transport
392            .send(cx, &JsonRpcMessage::Request(init_request))
393            .map_err(|e| McpError::internal_error(format!("Failed to send initialize: {e}")))?;
394
395        // Receive initialize response
396        let response = loop {
397            let msg = transport.recv(cx).map_err(|e| {
398                McpError::internal_error(format!("Failed to receive response: {e}"))
399            })?;
400
401            match msg {
402                JsonRpcMessage::Response(resp) => break resp,
403                JsonRpcMessage::Request(_) => {
404                    // Ignore server requests during initialization
405                }
406            }
407        };
408
409        // Check for error
410        if let Some(error) = response.error {
411            return Err(McpError::new(
412                fastmcp_core::McpErrorCode::Custom(error.code),
413                error.message,
414            ));
415        }
416
417        // Parse result
418        let result_value = response
419            .result
420            .ok_or_else(|| McpError::internal_error("No result in initialize response"))?;
421
422        let init_result: InitializeResult = serde_json::from_value(result_value).map_err(|e| {
423            McpError::internal_error(format!("Failed to parse initialize result: {e}"))
424        })?;
425
426        // Send initialized notification
427        let initialized_request = JsonRpcRequest {
428            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
429            method: "initialized".to_string(),
430            params: Some(serde_json::json!({})),
431            id: None,
432        };
433
434        transport
435            .send(cx, &JsonRpcMessage::Request(initialized_request))
436            .map_err(|e| McpError::internal_error(format!("Failed to send initialized: {e}")))?;
437
438        // Create session
439        let session = ClientSession::new(
440            self.client_info.clone(),
441            self.capabilities.clone(),
442            init_result.server_info,
443            init_result.capabilities,
444            init_result.protocol_version,
445        );
446
447        // Create client - disarm guard since Client now owns the subprocess
448        Ok(Client::from_parts(
449            child_guard.disarm(),
450            transport,
451            cx.clone(),
452            session,
453            self.timeout_ms,
454        ))
455    }
456}
457
458impl Default for ClientBuilder {
459    fn default() -> Self {
460        Self::new()
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use fastmcp_core::McpErrorCode;
468
469    #[test]
470    fn test_builder_defaults() {
471        let builder = ClientBuilder::new();
472        assert_eq!(builder.client_info.name, "fastmcp-client");
473        assert_eq!(builder.timeout_ms, 30_000);
474        assert_eq!(builder.max_retries, 0);
475        assert_eq!(builder.retry_delay_ms, 1_000);
476        assert!(builder.inherit_env);
477        assert!(builder.working_dir.is_none());
478        assert!(builder.env_vars.is_empty());
479        assert!(!builder.auto_initialize);
480    }
481
482    #[test]
483    fn test_builder_fluent_api() {
484        let builder = ClientBuilder::new()
485            .client_info("test-client", "2.0.0")
486            .timeout_ms(60_000)
487            .max_retries(3)
488            .retry_delay_ms(500)
489            .working_dir("/tmp")
490            .env("FOO", "bar")
491            .env("BAZ", "qux")
492            .inherit_env(false);
493
494        assert_eq!(builder.client_info.name, "test-client");
495        assert_eq!(builder.client_info.version, "2.0.0");
496        assert_eq!(builder.timeout_ms, 60_000);
497        assert_eq!(builder.max_retries, 3);
498        assert_eq!(builder.retry_delay_ms, 500);
499        assert_eq!(builder.working_dir, Some(PathBuf::from("/tmp")));
500        assert_eq!(builder.env_vars.get("FOO"), Some(&"bar".to_string()));
501        assert_eq!(builder.env_vars.get("BAZ"), Some(&"qux".to_string()));
502        assert!(!builder.inherit_env);
503    }
504
505    #[test]
506    fn test_builder_envs() {
507        let vars = [("KEY1", "value1"), ("KEY2", "value2")];
508        let builder = ClientBuilder::new().envs(vars);
509
510        assert_eq!(builder.env_vars.get("KEY1"), Some(&"value1".to_string()));
511        assert_eq!(builder.env_vars.get("KEY2"), Some(&"value2".to_string()));
512    }
513
514    #[test]
515    fn test_builder_clone() {
516        let builder1 = ClientBuilder::new()
517            .client_info("test", "1.0")
518            .timeout_ms(5000);
519
520        let builder2 = builder1.clone();
521
522        assert_eq!(builder2.client_info.name, "test");
523        assert_eq!(builder2.timeout_ms, 5000);
524    }
525
526    #[test]
527    fn test_builder_auto_initialize() {
528        let builder = ClientBuilder::new().auto_initialize(true);
529        assert!(builder.auto_initialize);
530
531        let builder = ClientBuilder::new().auto_initialize(false);
532        assert!(!builder.auto_initialize);
533    }
534
535    #[test]
536    fn test_builder_capabilities() {
537        let caps = ClientCapabilities {
538            sampling: Some(fastmcp_protocol::SamplingCapability {}),
539            elicitation: None,
540            roots: None,
541        };
542        let builder = ClientBuilder::new().capabilities(caps);
543        assert!(builder.capabilities.sampling.is_some());
544        assert!(builder.capabilities.elicitation.is_none());
545        assert!(builder.capabilities.roots.is_none());
546    }
547
548    #[test]
549    fn test_builder_default_trait() {
550        let builder = ClientBuilder::default();
551        assert_eq!(builder.client_info.name, "fastmcp-client");
552        assert_eq!(builder.timeout_ms, 30_000);
553        assert_eq!(builder.max_retries, 0);
554        assert!(!builder.auto_initialize);
555    }
556
557    #[test]
558    fn test_builder_env_override() {
559        let builder = ClientBuilder::new()
560            .env("KEY", "first")
561            .env("KEY", "second");
562        assert_eq!(builder.env_vars.get("KEY"), Some(&"second".to_string()));
563    }
564
565    #[test]
566    fn test_builder_envs_combined_with_env() {
567        let builder = ClientBuilder::new()
568            .env("A", "1")
569            .envs([("B", "2"), ("C", "3")])
570            .env("D", "4");
571        assert_eq!(builder.env_vars.len(), 4);
572        assert_eq!(builder.env_vars.get("A"), Some(&"1".to_string()));
573        assert_eq!(builder.env_vars.get("B"), Some(&"2".to_string()));
574        assert_eq!(builder.env_vars.get("C"), Some(&"3".to_string()));
575        assert_eq!(builder.env_vars.get("D"), Some(&"4".to_string()));
576    }
577
578    #[test]
579    fn test_connect_stdio_with_cx_respects_cancellation_during_retries() {
580        let cx = Cx::for_request();
581        cx.set_cancel_requested(true);
582        let result = ClientBuilder::new()
583            .max_retries(2)
584            .retry_delay_ms(100)
585            .connect_stdio_with_cx("definitely-not-a-real-command", &[], &cx);
586
587        assert!(
588            result.is_err(),
589            "cancelled context should abort before retry attempts"
590        );
591        let err = result.err().expect("error result");
592        assert_eq!(err.code, McpErrorCode::RequestCancelled);
593    }
594
595    #[test]
596    fn test_connect_stdio_with_cx_max_retries_does_not_overflow() {
597        let cx = Cx::for_request();
598        cx.set_cancel_requested(true);
599
600        let result = ClientBuilder::new()
601            .max_retries(u32::MAX)
602            .retry_delay_ms(1)
603            .connect_stdio_with_cx("definitely-not-a-real-command", &[], &cx);
604
605        assert!(
606            result.is_err(),
607            "cancelled context should return an error, not panic from retry overflow"
608        );
609        let err = result.err().expect("error result");
610        assert_eq!(err.code, McpErrorCode::RequestCancelled);
611    }
612
613    #[test]
614    fn builder_debug_includes_client_info() {
615        let builder = ClientBuilder::new().client_info("dbg-test", "0.1");
616        let debug = format!("{:?}", builder);
617        assert!(debug.contains("dbg-test"));
618        assert!(debug.contains("0.1"));
619    }
620
621    #[test]
622    fn connect_stdio_nonexistent_command_fails() {
623        let result = ClientBuilder::new()
624            .max_retries(0)
625            .connect_stdio("fastmcp_nonexistent_binary_xyz", &["--version"]);
626        assert!(result.is_err());
627    }
628
629    #[test]
630    fn builder_working_dir_last_wins() {
631        let builder = ClientBuilder::new()
632            .working_dir("/first")
633            .working_dir("/second");
634        assert_eq!(builder.working_dir, Some(PathBuf::from("/second")));
635    }
636
637    // =========================================================================
638    // Additional coverage tests (bd-10fu)
639    // =========================================================================
640
641    #[test]
642    fn child_guard_disarm_returns_child() {
643        let child = Command::new("true")
644            .stdin(Stdio::null())
645            .stdout(Stdio::null())
646            .stderr(Stdio::null())
647            .spawn()
648            .expect("failed to spawn 'true'");
649        let guard = ChildGuard::new(child);
650        let mut returned = guard.disarm();
651        // disarm gives back a valid Child we can wait on
652        let status = returned.wait().expect("wait failed");
653        assert!(status.success());
654    }
655
656    #[test]
657    fn child_guard_drop_kills_child() {
658        let child = Command::new("sleep")
659            .arg("60")
660            .stdin(Stdio::null())
661            .stdout(Stdio::null())
662            .stderr(Stdio::null())
663            .spawn()
664            .expect("failed to spawn 'sleep'");
665        let pid = child.id();
666        {
667            let _guard = ChildGuard::new(child);
668            // guard dropped here → child is killed and waited
669        }
670        // Verify the process is no longer running by trying to wait on it
671        // via /proc (Linux-specific but sufficient for CI)
672        let proc_path = format!("/proc/{}/status", pid);
673        assert!(
674            !std::path::Path::new(&proc_path).exists(),
675            "process should no longer exist after drop"
676        );
677    }
678
679    #[test]
680    fn builder_capabilities_default_is_empty() {
681        let builder = ClientBuilder::new();
682        assert!(builder.capabilities.sampling.is_none());
683        assert!(builder.capabilities.elicitation.is_none());
684        assert!(builder.capabilities.roots.is_none());
685    }
686
687    #[test]
688    fn connect_stdio_spawn_failure_error_message() {
689        let result = ClientBuilder::new()
690            .max_retries(0)
691            .connect_stdio("fastmcp_no_such_binary_abc123", &[]);
692        match result {
693            Err(err) => assert!(
694                err.message.contains("spawn"),
695                "error should mention spawn failure: {}",
696                err.message
697            ),
698            Ok(_) => panic!("expected spawn to fail"),
699        }
700    }
701}