Skip to main content

fastmcp_client/
lib.rs

1//! MCP client implementation for FastMCP.
2//!
3//! This crate provides the client-side implementation:
4//! - Client builder pattern
5//! - Tool invocation
6//! - Resource reading
7//! - Prompt fetching
8//!
9//! # Example
10//!
11//! ```ignore
12//! use fastmcp::Client;
13//!
14//! let client = Client::stdio("uvx", &["my-mcp-server"]).await?;
15//!
16//! // List tools
17//! let tools = client.list_tools().await?;
18//!
19//! // Call a tool
20//! let result = client.call_tool("greet", json!({"name": "World"})).await?;
21//! ```
22//!
23//! # Role in the System
24//!
25//! `fastmcp-client` is the **companion client** to `fastmcp-server`. It uses
26//! the same protocol models and transport layer to:
27//! - Spawn MCP servers as subprocesses (stdio)
28//! - Initialize sessions and negotiate capabilities
29//! - Call tools, read resources, and fetch prompts
30//!
31//! If you are embedding FastMCP into a larger application (e.g. testing,
32//! orchestration, or local agent tooling), this is the crate that drives the
33//! client side of the protocol.
34
35#![forbid(unsafe_code)]
36#![allow(dead_code)]
37
38mod builder;
39pub mod mcp_config;
40mod session;
41
42pub use builder::ClientBuilder;
43pub use session::ClientSession;
44
45use std::process::{Child, ChildStdin, ChildStdout, Command, Stdio};
46use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
47use std::time::{Duration, Instant};
48
49use asupersync::Cx;
50use fastmcp_core::{McpError, McpResult};
51use fastmcp_protocol::{
52    CallToolParams, CallToolResult, CancelTaskParams, CancelTaskResult, CancelledParams,
53    ClientCapabilities, ClientInfo, Content, GetPromptParams, GetPromptResult, GetTaskParams,
54    GetTaskResult, InitializeParams, InitializeResult, JsonRpcMessage, JsonRpcRequest,
55    JsonRpcResponse, ListPromptsParams, ListPromptsResult, ListResourceTemplatesParams,
56    ListResourceTemplatesResult, ListResourcesParams, ListResourcesResult, ListTasksParams,
57    ListTasksResult, ListToolsParams, ListToolsResult, LogLevel, LogMessageParams,
58    PROTOCOL_VERSION, ProgressToken as ProgressMarker, Prompt, PromptMessage, ReadResourceParams,
59    ReadResourceResult, RequestId, RequestMeta, Resource, ResourceContent, ResourceTemplate,
60    ServerCapabilities, ServerInfo, SetLogLevelParams, SubmitTaskParams, SubmitTaskResult, TaskId,
61    TaskInfo, TaskResult, TaskStatus, Tool,
62};
63
64/// Callback for receiving progress notifications during tool execution.
65///
66/// The callback receives the progress value, optional total, and optional message.
67pub type ProgressCallback<'a> = &'a mut dyn FnMut(f64, Option<f64>, Option<&str>);
68use fastmcp_transport::{StdioTransport, Transport, TransportError};
69
70#[derive(Debug, serde::Deserialize)]
71struct ClientProgressParams {
72    #[serde(rename = "progressToken")]
73    marker: ProgressMarker,
74    progress: f64,
75    total: Option<f64>,
76    message: Option<String>,
77}
78
79fn method_not_found_response(request: &JsonRpcRequest) -> Option<JsonRpcMessage> {
80    let id = request.id.clone()?;
81    let error = McpError::method_not_found(&request.method);
82    let response = JsonRpcResponse::error(Some(id), error.into());
83    Some(JsonRpcMessage::Response(response))
84}
85
86/// An MCP client instance.
87///
88/// Clients are built using [`ClientBuilder`] and can connect to servers
89/// via various transports (stdio subprocess, SSE, WebSocket).
90pub struct Client {
91    /// The subprocess running the MCP server.
92    child: Child,
93    /// Transport for communication.
94    transport: StdioTransport<ChildStdout, ChildStdin>,
95    /// Capability context for cancellation.
96    cx: Cx,
97    /// Session state after initialization.
98    session: ClientSession,
99    /// Request ID counter.
100    next_id: AtomicU64,
101    /// Request timeout in milliseconds (0 = no timeout).
102    timeout_ms: u64,
103    /// Whether auto-initialization is enabled (for documentation/debugging).
104    #[allow(dead_code)]
105    auto_initialize: bool,
106    /// Whether the client has been initialized.
107    initialized: AtomicBool,
108}
109
110impl Client {
111    /// Creates a client connecting to a subprocess via stdio.
112    ///
113    /// # Arguments
114    ///
115    /// * `command` - The command to run (e.g., "uvx", "npx")
116    /// * `args` - Arguments to pass to the command
117    ///
118    /// # Errors
119    ///
120    /// Returns an error if the subprocess fails to start or initialization fails.
121    pub fn stdio(command: &str, args: &[&str]) -> McpResult<Self> {
122        Self::stdio_with_cx(command, args, Cx::for_testing())
123    }
124
125    /// Creates a client with a provided Cx for cancellation support.
126    pub fn stdio_with_cx(command: &str, args: &[&str], cx: Cx) -> McpResult<Self> {
127        // Spawn the subprocess
128        let mut child = Command::new(command)
129            .args(args)
130            .stdin(Stdio::piped())
131            .stdout(Stdio::piped())
132            .stderr(Stdio::inherit())
133            .spawn()
134            .map_err(|e| McpError::internal_error(format!("Failed to spawn subprocess: {e}")))?;
135
136        // Get stdin/stdout handles
137        let stdin = child
138            .stdin
139            .take()
140            .ok_or_else(|| McpError::internal_error("Failed to get subprocess stdin"))?;
141        let stdout = child
142            .stdout
143            .take()
144            .ok_or_else(|| McpError::internal_error("Failed to get subprocess stdout"))?;
145
146        // Create transport
147        let transport = StdioTransport::new(stdout, stdin);
148
149        // Create client info
150        let client_info = ClientInfo {
151            name: "fastmcp-client".to_owned(),
152            version: env!("CARGO_PKG_VERSION").to_owned(),
153        };
154        let client_capabilities = ClientCapabilities::default();
155
156        // Create a temporary client for initialization
157        let mut client = Self {
158            child,
159            transport,
160            cx,
161            session: ClientSession::new(
162                client_info.clone(),
163                client_capabilities.clone(),
164                ServerInfo {
165                    name: String::new(),
166                    version: String::new(),
167                },
168                ServerCapabilities::default(),
169                String::new(),
170            ),
171            next_id: AtomicU64::new(1),
172            timeout_ms: 30_000, // Default 30 second timeout
173            auto_initialize: false,
174            initialized: AtomicBool::new(false),
175        };
176
177        // Perform initialization handshake
178        let init_result = client.initialize(client_info, client_capabilities)?;
179
180        // Update session with server response
181        client.session = ClientSession::new(
182            client.session.client_info().clone(),
183            client.session.client_capabilities().clone(),
184            init_result.server_info,
185            init_result.capabilities,
186            init_result.protocol_version,
187        );
188
189        // Send initialized notification
190        client.send_notification("initialized", serde_json::json!({}))?;
191
192        // Mark as initialized
193        client.initialized.store(true, Ordering::SeqCst);
194
195        Ok(client)
196    }
197
198    /// Creates a new client builder.
199    #[must_use]
200    pub fn builder() -> ClientBuilder {
201        ClientBuilder::new()
202    }
203
204    /// Creates a client from its component parts.
205    ///
206    /// This is an internal constructor used by the builder.
207    pub(crate) fn from_parts(
208        child: Child,
209        transport: StdioTransport<ChildStdout, ChildStdin>,
210        cx: Cx,
211        session: ClientSession,
212        timeout_ms: u64,
213    ) -> Self {
214        Self {
215            child,
216            transport,
217            cx,
218            session,
219            next_id: AtomicU64::new(2), // Start at 2 since initialize used 1
220            timeout_ms,
221            auto_initialize: false,
222            initialized: AtomicBool::new(true), // Already initialized by builder
223        }
224    }
225
226    /// Creates an uninitialized client for auto-initialize mode.
227    ///
228    /// This is an internal constructor used by the builder when auto_initialize is enabled.
229    pub(crate) fn from_parts_uninitialized(
230        child: Child,
231        transport: StdioTransport<ChildStdout, ChildStdin>,
232        cx: Cx,
233        session: ClientSession,
234        timeout_ms: u64,
235    ) -> Self {
236        Self {
237            child,
238            transport,
239            cx,
240            session,
241            next_id: AtomicU64::new(1), // Start at 1 since initialize hasn't happened
242            timeout_ms,
243            auto_initialize: true,
244            initialized: AtomicBool::new(false),
245        }
246    }
247
248    /// Ensures the client is initialized.
249    ///
250    /// In auto-initialize mode, this performs the initialization handshake on first call.
251    /// In normal mode, this is a no-op since the client is already initialized.
252    ///
253    /// Since this method takes `&mut self`, Rust's borrowing rules guarantee exclusive
254    /// access, so no additional synchronization is needed.
255    ///
256    /// # Errors
257    ///
258    /// Returns an error if initialization fails.
259    pub fn ensure_initialized(&mut self) -> McpResult<()> {
260        // Already initialized - nothing to do
261        if self.initialized.load(Ordering::SeqCst) {
262            return Ok(());
263        }
264
265        // Perform initialization
266        let client_info = self.session.client_info().clone();
267        let capabilities = self.session.client_capabilities().clone();
268        let init_result = self.initialize(client_info, capabilities)?;
269
270        // Update session with server response
271        self.session = ClientSession::new(
272            self.session.client_info().clone(),
273            self.session.client_capabilities().clone(),
274            init_result.server_info,
275            init_result.capabilities,
276            init_result.protocol_version,
277        );
278
279        // Send initialized notification
280        self.send_notification("initialized", serde_json::json!({}))?;
281
282        // Mark as initialized
283        self.initialized.store(true, Ordering::SeqCst);
284
285        Ok(())
286    }
287
288    /// Returns whether the client has been initialized.
289    #[must_use]
290    pub fn is_initialized(&self) -> bool {
291        self.initialized.load(Ordering::SeqCst)
292    }
293
294    /// Returns the server info after initialization.
295    #[must_use]
296    pub fn server_info(&self) -> &ServerInfo {
297        self.session.server_info()
298    }
299
300    /// Returns the server capabilities after initialization.
301    #[must_use]
302    pub fn server_capabilities(&self) -> &ServerCapabilities {
303        self.session.server_capabilities()
304    }
305
306    /// Returns the protocol version negotiated during initialization.
307    #[must_use]
308    pub fn protocol_version(&self) -> &str {
309        self.session.protocol_version()
310    }
311
312    /// Generates the next request ID.
313    fn next_request_id(&self) -> u64 {
314        self.next_id.fetch_add(1, Ordering::SeqCst)
315    }
316
317    /// Sends a request and waits for response.
318    fn send_request<P: serde::Serialize, R: serde::de::DeserializeOwned>(
319        &mut self,
320        method: &str,
321        params: P,
322    ) -> McpResult<R> {
323        let id = self.next_request_id();
324        let params_value = serde_json::to_value(params)
325            .map_err(|e| McpError::internal_error(format!("Failed to serialize params: {e}")))?;
326
327        #[allow(clippy::cast_possible_wrap)]
328        let (request_id, request) = {
329            let id_i64 = id as i64;
330            (
331                RequestId::Number(id_i64),
332                JsonRpcRequest::new(method, Some(params_value), id_i64),
333            )
334        };
335
336        // Send request
337        self.transport
338            .send(&self.cx, &JsonRpcMessage::Request(request))
339            .map_err(transport_error_to_mcp)?;
340
341        // Receive response with ID validation
342        let response = self.recv_response(&request_id)?;
343
344        // Check for error response
345        if let Some(error) = response.error {
346            return Err(McpError::new(
347                fastmcp_core::McpErrorCode::from(error.code),
348                error.message,
349            ));
350        }
351
352        // Parse result
353        let result = response
354            .result
355            .ok_or_else(|| McpError::internal_error("No result in response"))?;
356
357        serde_json::from_value(result)
358            .map_err(|e| McpError::internal_error(format!("Failed to deserialize response: {e}")))
359    }
360
361    /// Sends a notification (no response expected).
362    fn send_notification<P: serde::Serialize>(&mut self, method: &str, params: P) -> McpResult<()> {
363        let params_value = serde_json::to_value(params)
364            .map_err(|e| McpError::internal_error(format!("Failed to serialize params: {e}")))?;
365
366        // Create a notification (request without id)
367        let request = JsonRpcRequest {
368            jsonrpc: std::borrow::Cow::Borrowed(fastmcp_protocol::JSONRPC_VERSION),
369            method: method.to_string(),
370            params: Some(params_value),
371            id: None,
372        };
373
374        self.transport
375            .send(&self.cx, &JsonRpcMessage::Request(request))
376            .map_err(transport_error_to_mcp)?;
377
378        Ok(())
379    }
380
381    /// Sends a cancellation notification for a previously issued request.
382    ///
383    /// Set `await_cleanup` to request that the server wait for any cleanup
384    /// before acknowledging completion (best-effort; server-dependent).
385    ///
386    /// # Errors
387    ///
388    /// Returns an error if the notification cannot be sent.
389    pub fn cancel_request(
390        &mut self,
391        request_id: impl Into<RequestId>,
392        reason: Option<String>,
393        await_cleanup: bool,
394    ) -> McpResult<()> {
395        let params = CancelledParams {
396            request_id: request_id.into(),
397            reason,
398            await_cleanup: if await_cleanup { Some(true) } else { None },
399        };
400        self.send_notification("notifications/cancelled", params)
401    }
402
403    /// Receives a response from the transport, validating the response ID.
404    fn recv_response(
405        &mut self,
406        expected_id: &RequestId,
407    ) -> McpResult<fastmcp_protocol::JsonRpcResponse> {
408        // Calculate deadline if timeout is configured
409        let deadline = if self.timeout_ms > 0 {
410            Some(Instant::now() + Duration::from_millis(self.timeout_ms))
411        } else {
412            None
413        };
414
415        loop {
416            // Check timeout before each recv attempt
417            if let Some(deadline) = deadline {
418                if Instant::now() >= deadline {
419                    return Err(McpError::internal_error("Request timed out"));
420                }
421            }
422
423            let message = self
424                .transport
425                .recv(&self.cx)
426                .map_err(transport_error_to_mcp)?;
427
428            match message {
429                JsonRpcMessage::Response(response) => {
430                    // Validate response ID matches the expected request ID
431                    if let Some(ref id) = response.id {
432                        if id != expected_id {
433                            // This response is for a different request; continue waiting.
434                            // (Could queue it for later, but for simplicity we discard.)
435                            continue;
436                        }
437                    }
438                    return Ok(response);
439                }
440                JsonRpcMessage::Request(request) => {
441                    // Server sending a request to client (e.g., notification)
442                    if request.method == "notifications/message" {
443                        if let Some(params) = request.params.as_ref() {
444                            if let Ok(message) =
445                                serde_json::from_value::<LogMessageParams>(params.clone())
446                            {
447                                self.emit_log_message(message);
448                            }
449                        }
450                    }
451
452                    if let Some(response) = method_not_found_response(&request) {
453                        self.transport
454                            .send(&self.cx, &response)
455                            .map_err(transport_error_to_mcp)?;
456                    }
457                }
458            }
459        }
460    }
461
462    /// Performs the initialization handshake.
463    fn initialize(
464        &mut self,
465        client_info: ClientInfo,
466        capabilities: ClientCapabilities,
467    ) -> McpResult<InitializeResult> {
468        let params = InitializeParams {
469            protocol_version: PROTOCOL_VERSION.to_string(),
470            capabilities,
471            client_info,
472        };
473
474        self.send_request("initialize", params)
475    }
476
477    /// Lists available tools.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if the request fails.
482    pub fn list_tools(&mut self) -> McpResult<Vec<Tool>> {
483        self.ensure_initialized()?;
484        let params = ListToolsParams::default();
485        let result: ListToolsResult = self.send_request("tools/list", params)?;
486        Ok(result.tools)
487    }
488
489    /// Calls a tool with the given arguments.
490    ///
491    /// # Errors
492    ///
493    /// Returns an error if the tool call fails.
494    pub fn call_tool(
495        &mut self,
496        name: &str,
497        arguments: serde_json::Value,
498    ) -> McpResult<Vec<Content>> {
499        self.ensure_initialized()?;
500        let params = CallToolParams {
501            name: name.to_string(),
502            arguments: Some(arguments),
503            meta: None,
504        };
505        let result: CallToolResult = self.send_request("tools/call", params)?;
506
507        if result.is_error {
508            // Extract error message from content if available
509            let error_msg = result
510                .content
511                .first()
512                .and_then(|c| match c {
513                    Content::Text { text } => Some(text.clone()),
514                    _ => None,
515                })
516                .unwrap_or_else(|| "Tool execution failed".to_string());
517            return Err(McpError::tool_error(error_msg));
518        }
519
520        Ok(result.content)
521    }
522
523    /// Calls a tool with progress callback support.
524    ///
525    /// This method allows you to receive progress notifications during tool execution.
526    /// The callback is invoked for each progress notification received from the server.
527    ///
528    /// # Arguments
529    ///
530    /// * `name` - The tool name to call
531    /// * `arguments` - The tool arguments as JSON
532    /// * `on_progress` - Callback invoked for each progress notification
533    ///
534    /// # Errors
535    ///
536    /// Returns an error if the tool call fails.
537    pub fn call_tool_with_progress(
538        &mut self,
539        name: &str,
540        arguments: serde_json::Value,
541        on_progress: ProgressCallback<'_>,
542    ) -> McpResult<Vec<Content>> {
543        self.ensure_initialized()?;
544        // Generate a unique request ID and reuse it as the progress token.
545        let request_id = self.next_request_id();
546        #[allow(clippy::cast_possible_wrap)]
547        let progress_marker = ProgressMarker::Number(request_id as i64);
548
549        let params = CallToolParams {
550            name: name.to_string(),
551            arguments: Some(arguments),
552            meta: Some(RequestMeta {
553                progress_token: Some(progress_marker.clone()),
554            }),
555        };
556
557        let result: CallToolResult = self.send_request_with_progress(
558            "tools/call",
559            params,
560            request_id,
561            &progress_marker,
562            on_progress,
563        )?;
564
565        if result.is_error {
566            // Extract error message from content if available
567            let error_msg = result
568                .content
569                .first()
570                .and_then(|c| match c {
571                    Content::Text { text } => Some(text.clone()),
572                    _ => None,
573                })
574                .unwrap_or_else(|| "Tool execution failed".to_string());
575            return Err(McpError::tool_error(error_msg));
576        }
577
578        Ok(result.content)
579    }
580
581    /// Sends a request and waits for response, handling progress notifications.
582    fn send_request_with_progress<P: serde::Serialize, R: serde::de::DeserializeOwned>(
583        &mut self,
584        method: &str,
585        params: P,
586        request_id: u64,
587        expected_marker: &ProgressMarker,
588        on_progress: ProgressCallback<'_>,
589    ) -> McpResult<R> {
590        let params_value = serde_json::to_value(params)
591            .map_err(|e| McpError::internal_error(format!("Failed to serialize params: {e}")))?;
592
593        #[allow(clippy::cast_possible_wrap)]
594        let request = JsonRpcRequest::new(method, Some(params_value), request_id as i64);
595
596        // Send request
597        self.transport
598            .send(&self.cx, &JsonRpcMessage::Request(request))
599            .map_err(transport_error_to_mcp)?;
600
601        // Receive response, handling progress notifications
602        let response = self.recv_response_with_progress(expected_marker, on_progress)?;
603
604        // Check for error response
605        if let Some(error) = response.error {
606            return Err(McpError::new(
607                fastmcp_core::McpErrorCode::from(error.code),
608                error.message,
609            ));
610        }
611
612        // Parse result
613        let result = response
614            .result
615            .ok_or_else(|| McpError::internal_error("No result in response"))?;
616
617        serde_json::from_value(result)
618            .map_err(|e| McpError::internal_error(format!("Failed to deserialize response: {e}")))
619    }
620
621    /// Receives a response from the transport, handling progress notifications.
622    fn recv_response_with_progress(
623        &mut self,
624        expected_marker: &ProgressMarker,
625        on_progress: ProgressCallback<'_>,
626    ) -> McpResult<fastmcp_protocol::JsonRpcResponse> {
627        // Calculate deadline if timeout is configured
628        let deadline = if self.timeout_ms > 0 {
629            Some(Instant::now() + Duration::from_millis(self.timeout_ms))
630        } else {
631            None
632        };
633
634        loop {
635            // Check timeout before each recv attempt
636            if let Some(deadline) = deadline {
637                if Instant::now() >= deadline {
638                    return Err(McpError::internal_error("Request timed out"));
639                }
640            }
641
642            let message = self
643                .transport
644                .recv(&self.cx)
645                .map_err(transport_error_to_mcp)?;
646
647            match message {
648                JsonRpcMessage::Response(response) => return Ok(response),
649                JsonRpcMessage::Request(request) => {
650                    // Check if this is a progress notification
651                    if request.method == "notifications/progress" {
652                        if let Some(params) = request.params.as_ref() {
653                            if let Ok(progress) =
654                                serde_json::from_value::<ClientProgressParams>(params.clone())
655                            {
656                                // Only handle progress for our expected marker
657                                if progress.marker == *expected_marker {
658                                    on_progress(
659                                        progress.progress,
660                                        progress.total,
661                                        progress.message.as_deref(),
662                                    );
663                                }
664                            }
665                        }
666                    } else if request.method == "notifications/message" {
667                        if let Some(params) = request.params.as_ref() {
668                            if let Ok(message) =
669                                serde_json::from_value::<LogMessageParams>(params.clone())
670                            {
671                                self.emit_log_message(message);
672                            }
673                        }
674                    }
675
676                    if let Some(response) = method_not_found_response(&request) {
677                        self.transport
678                            .send(&self.cx, &response)
679                            .map_err(transport_error_to_mcp)?;
680                    }
681                    // Continue waiting for actual response
682                }
683            }
684        }
685    }
686
687    fn emit_log_message(&self, message: LogMessageParams) {
688        let level = match message.level {
689            LogLevel::Debug => log::Level::Debug,
690            LogLevel::Info => log::Level::Info,
691            LogLevel::Warning => log::Level::Warn,
692            LogLevel::Error => log::Level::Error,
693        };
694
695        let target = message.logger.as_deref().unwrap_or("fastmcp::remote");
696        let text = match message.data {
697            serde_json::Value::String(s) => s,
698            other => other.to_string(),
699        };
700
701        log::log!(target: target, level, "{text}");
702    }
703
704    /// Lists available resources.
705    ///
706    /// # Errors
707    ///
708    /// Returns an error if the request fails.
709    pub fn list_resources(&mut self) -> McpResult<Vec<Resource>> {
710        self.ensure_initialized()?;
711        let params = ListResourcesParams::default();
712        let result: ListResourcesResult = self.send_request("resources/list", params)?;
713        Ok(result.resources)
714    }
715
716    /// Lists available resource templates.
717    ///
718    /// # Errors
719    ///
720    /// Returns an error if the request fails.
721    pub fn list_resource_templates(&mut self) -> McpResult<Vec<ResourceTemplate>> {
722        self.ensure_initialized()?;
723        let params = ListResourceTemplatesParams::default();
724        let result: ListResourceTemplatesResult =
725            self.send_request("resources/templates/list", params)?;
726        Ok(result.resource_templates)
727    }
728
729    /// Sets the server log level (if supported).
730    ///
731    /// # Errors
732    ///
733    /// Returns an error if the request fails.
734    pub fn set_log_level(&mut self, level: LogLevel) -> McpResult<()> {
735        self.ensure_initialized()?;
736        let params = SetLogLevelParams { level };
737        let _: serde_json::Value = self.send_request("logging/setLevel", params)?;
738        Ok(())
739    }
740
741    /// Reads a resource by URI.
742    ///
743    /// # Errors
744    ///
745    /// Returns an error if the resource cannot be read.
746    pub fn read_resource(&mut self, uri: &str) -> McpResult<Vec<ResourceContent>> {
747        self.ensure_initialized()?;
748        let params = ReadResourceParams {
749            uri: uri.to_string(),
750            meta: None,
751        };
752        let result: ReadResourceResult = self.send_request("resources/read", params)?;
753        Ok(result.contents)
754    }
755
756    /// Lists available prompts.
757    ///
758    /// # Errors
759    ///
760    /// Returns an error if the request fails.
761    pub fn list_prompts(&mut self) -> McpResult<Vec<Prompt>> {
762        self.ensure_initialized()?;
763        let params = ListPromptsParams::default();
764        let result: ListPromptsResult = self.send_request("prompts/list", params)?;
765        Ok(result.prompts)
766    }
767
768    /// Gets a prompt with the given arguments.
769    ///
770    /// # Errors
771    ///
772    /// Returns an error if the prompt cannot be retrieved.
773    pub fn get_prompt(
774        &mut self,
775        name: &str,
776        arguments: std::collections::HashMap<String, String>,
777    ) -> McpResult<Vec<PromptMessage>> {
778        self.ensure_initialized()?;
779        let params = GetPromptParams {
780            name: name.to_string(),
781            arguments: if arguments.is_empty() {
782                None
783            } else {
784                Some(arguments)
785            },
786            meta: None,
787        };
788        let result: GetPromptResult = self.send_request("prompts/get", params)?;
789        Ok(result.messages)
790    }
791
792    // ═══════════════════════════════════════════════════════════════════════
793    // Task Management (Docket/SEP-1686)
794    // ═══════════════════════════════════════════════════════════════════════
795
796    /// Submits a background task for execution.
797    ///
798    /// # Arguments
799    ///
800    /// * `task_type` - The type of task to execute (e.g., "data_export", "batch_process")
801    /// * `input` - Task parameters as JSON
802    ///
803    /// # Errors
804    ///
805    /// Returns an error if the server doesn't support tasks or the request fails.
806    pub fn submit_task(
807        &mut self,
808        task_type: &str,
809        input: serde_json::Value,
810    ) -> McpResult<TaskInfo> {
811        self.ensure_initialized()?;
812        let params = SubmitTaskParams {
813            task_type: task_type.to_string(),
814            params: Some(input),
815        };
816        let result: SubmitTaskResult = self.send_request("tasks/submit", params)?;
817        Ok(result.task)
818    }
819
820    /// Lists tasks with optional status filter.
821    ///
822    /// # Arguments
823    ///
824    /// * `status` - Optional filter by task status
825    /// * `cursor` - Optional pagination cursor from previous response
826    ///
827    /// # Errors
828    ///
829    /// Returns an error if the server doesn't support tasks or the request fails.
830    pub fn list_tasks(
831        &mut self,
832        status: Option<TaskStatus>,
833        cursor: Option<&str>,
834    ) -> McpResult<ListTasksResult> {
835        self.ensure_initialized()?;
836        let params = ListTasksParams {
837            cursor: cursor.map(ToString::to_string),
838            status,
839        };
840        self.send_request("tasks/list", params)
841    }
842
843    /// Gets detailed information about a specific task.
844    ///
845    /// # Arguments
846    ///
847    /// * `task_id` - The task ID to retrieve
848    ///
849    /// # Errors
850    ///
851    /// Returns an error if the task is not found or the request fails.
852    pub fn get_task(&mut self, task_id: &str) -> McpResult<GetTaskResult> {
853        self.ensure_initialized()?;
854        let params = GetTaskParams {
855            id: TaskId::from_string(task_id),
856        };
857        self.send_request("tasks/get", params)
858    }
859
860    /// Cancels a running or pending task.
861    ///
862    /// # Arguments
863    ///
864    /// * `task_id` - The task ID to cancel
865    ///
866    /// # Errors
867    ///
868    /// Returns an error if the task cannot be cancelled or is already complete.
869    pub fn cancel_task(&mut self, task_id: &str) -> McpResult<TaskInfo> {
870        self.cancel_task_with_reason(task_id, None)
871    }
872
873    /// Cancels a running or pending task with an optional reason.
874    ///
875    /// # Arguments
876    ///
877    /// * `task_id` - The task ID to cancel
878    /// * `reason` - Optional reason for the cancellation
879    ///
880    /// # Errors
881    ///
882    /// Returns an error if the task cannot be cancelled or is already complete.
883    pub fn cancel_task_with_reason(
884        &mut self,
885        task_id: &str,
886        reason: Option<&str>,
887    ) -> McpResult<TaskInfo> {
888        self.ensure_initialized()?;
889        let params = CancelTaskParams {
890            id: TaskId::from_string(task_id),
891            reason: reason.map(ToString::to_string),
892        };
893        let result: CancelTaskResult = self.send_request("tasks/cancel", params)?;
894        Ok(result.task)
895    }
896
897    /// Waits for a task to complete by polling.
898    ///
899    /// This method polls the server at the specified interval until the task
900    /// reaches a terminal state (completed, failed, or cancelled).
901    ///
902    /// # Arguments
903    ///
904    /// * `task_id` - The task ID to wait for
905    /// * `poll_interval` - Duration between poll requests
906    ///
907    /// # Errors
908    ///
909    /// Returns an error if the task fails, is cancelled, or the request fails.
910    pub fn wait_for_task(
911        &mut self,
912        task_id: &str,
913        poll_interval: Duration,
914    ) -> McpResult<TaskResult> {
915        loop {
916            let result = self.get_task(task_id)?;
917
918            // Check if task is complete
919            if result.task.status.is_terminal() {
920                // If task has a result, return it
921                if let Some(task_result) = result.result {
922                    return Ok(task_result);
923                }
924
925                // Task is terminal but no result - create one from the task info
926                return Ok(TaskResult {
927                    id: result.task.id,
928                    success: result.task.status == TaskStatus::Completed,
929                    data: None,
930                    error: result.task.error,
931                });
932            }
933
934            // Sleep before next poll
935            std::thread::sleep(poll_interval);
936        }
937    }
938
939    /// Waits for a task with progress callback.
940    ///
941    /// Similar to `wait_for_task` but also provides progress information via callback.
942    ///
943    /// # Arguments
944    ///
945    /// * `task_id` - The task ID to wait for
946    /// * `poll_interval` - Duration between poll requests
947    /// * `on_progress` - Callback invoked with progress updates
948    ///
949    /// # Errors
950    ///
951    /// Returns an error if the task fails, is cancelled, or the request fails.
952    pub fn wait_for_task_with_progress<F>(
953        &mut self,
954        task_id: &str,
955        poll_interval: Duration,
956        mut on_progress: F,
957    ) -> McpResult<TaskResult>
958    where
959        F: FnMut(f64, Option<&str>),
960    {
961        loop {
962            let result = self.get_task(task_id)?;
963
964            // Report progress if available
965            if let Some(progress) = result.task.progress {
966                on_progress(progress, result.task.message.as_deref());
967            }
968
969            // Check if task is complete
970            if result.task.status.is_terminal() {
971                // If task has a result, return it
972                if let Some(task_result) = result.result {
973                    return Ok(task_result);
974                }
975
976                // Task is terminal but no result - create one from the task info
977                return Ok(TaskResult {
978                    id: result.task.id,
979                    success: result.task.status == TaskStatus::Completed,
980                    data: None,
981                    error: result.task.error,
982                });
983            }
984
985            // Sleep before next poll
986            std::thread::sleep(poll_interval);
987        }
988    }
989
990    /// Closes the client connection.
991    pub fn close(mut self) {
992        // Close the transport
993        let _ = self.transport.close();
994
995        // Kill the subprocess if still running
996        let _ = self.child.kill();
997        let _ = self.child.wait();
998    }
999}
1000
1001impl Drop for Client {
1002    fn drop(&mut self) {
1003        // Ensure subprocess is cleaned up even if close() wasn't called.
1004        // Ignore errors since we're in drop - best effort cleanup.
1005        let _ = self.transport.close();
1006        let _ = self.child.kill();
1007        let _ = self.child.wait();
1008    }
1009}
1010
1011/// Converts a TransportError to McpError.
1012fn transport_error_to_mcp(e: TransportError) -> McpError {
1013    match e {
1014        TransportError::Cancelled => McpError::request_cancelled(),
1015        TransportError::Closed => McpError::internal_error("Transport closed"),
1016        TransportError::Timeout => McpError::internal_error("Request timed out"),
1017        TransportError::Io(io_err) => McpError::internal_error(format!("I/O error: {io_err}")),
1018        TransportError::Codec(codec_err) => {
1019            McpError::internal_error(format!("Codec error: {codec_err}"))
1020        }
1021    }
1022}
1023
1024#[cfg(test)]
1025mod tests {
1026    use super::*;
1027
1028    // ========================================
1029    // method_not_found_response tests
1030    // ========================================
1031
1032    #[test]
1033    fn method_not_found_response_for_request() {
1034        let request = JsonRpcRequest::new("sampling/createMessage", None, "req-1");
1035        let response = method_not_found_response(&request);
1036        assert!(response.is_some());
1037        if let Some(JsonRpcMessage::Response(resp)) = response {
1038            assert!(matches!(
1039                resp.error.as_ref(),
1040                Some(error)
1041                    if error.code == i32::from(fastmcp_core::McpErrorCode::MethodNotFound)
1042            ));
1043            assert_eq!(resp.id, Some(RequestId::String("req-1".to_string())));
1044        } else {
1045            assert!(matches!(response, Some(JsonRpcMessage::Response(_))));
1046        }
1047    }
1048
1049    #[test]
1050    fn method_not_found_response_for_notification() {
1051        let request = JsonRpcRequest::notification("notifications/message", None);
1052        let response = method_not_found_response(&request);
1053        assert!(response.is_none());
1054    }
1055
1056    #[test]
1057    fn method_not_found_response_with_numeric_id() {
1058        let request = JsonRpcRequest::new("unknown/method", None, 42i64);
1059        let response = method_not_found_response(&request);
1060        assert!(response.is_some());
1061        if let Some(JsonRpcMessage::Response(resp)) = response {
1062            assert_eq!(resp.id, Some(RequestId::Number(42)));
1063            let error = resp.error.as_ref().unwrap();
1064            assert_eq!(
1065                error.code,
1066                i32::from(fastmcp_core::McpErrorCode::MethodNotFound)
1067            );
1068            assert!(error.message.contains("unknown/method"));
1069        }
1070    }
1071
1072    #[test]
1073    fn method_not_found_response_with_params() {
1074        let params = serde_json::json!({"key": "value"});
1075        let request = JsonRpcRequest::new("roots/list", Some(params), "req-99");
1076        let response = method_not_found_response(&request);
1077        assert!(response.is_some());
1078        if let Some(JsonRpcMessage::Response(resp)) = response {
1079            let error = resp.error.as_ref().unwrap();
1080            assert!(error.message.contains("roots/list"));
1081        }
1082    }
1083
1084    // ========================================
1085    // transport_error_to_mcp tests
1086    // ========================================
1087
1088    #[test]
1089    fn transport_error_cancelled_maps_to_request_cancelled() {
1090        let err = transport_error_to_mcp(TransportError::Cancelled);
1091        assert_eq!(err.code, fastmcp_core::McpErrorCode::RequestCancelled);
1092    }
1093
1094    #[test]
1095    fn transport_error_closed_maps_to_internal() {
1096        let err = transport_error_to_mcp(TransportError::Closed);
1097        assert_eq!(err.code, fastmcp_core::McpErrorCode::InternalError);
1098        assert!(err.message.contains("closed"));
1099    }
1100
1101    #[test]
1102    fn transport_error_timeout_maps_to_internal() {
1103        let err = transport_error_to_mcp(TransportError::Timeout);
1104        assert_eq!(err.code, fastmcp_core::McpErrorCode::InternalError);
1105        assert!(err.message.contains("timed out"));
1106    }
1107
1108    #[test]
1109    fn transport_error_io_maps_to_internal() {
1110        let io_err = std::io::Error::new(std::io::ErrorKind::BrokenPipe, "pipe broken");
1111        let err = transport_error_to_mcp(TransportError::Io(io_err));
1112        assert_eq!(err.code, fastmcp_core::McpErrorCode::InternalError);
1113        assert!(err.message.contains("I/O error"));
1114    }
1115
1116    #[test]
1117    fn transport_error_codec_maps_to_internal() {
1118        use fastmcp_transport::CodecError;
1119        let codec_err = CodecError::MessageTooLarge(999_999);
1120        let err = transport_error_to_mcp(TransportError::Codec(codec_err));
1121        assert_eq!(err.code, fastmcp_core::McpErrorCode::InternalError);
1122        assert!(err.message.contains("Codec error"));
1123    }
1124
1125    // ========================================
1126    // ClientProgressParams tests
1127    // ========================================
1128
1129    #[test]
1130    fn client_progress_params_deserialization() {
1131        let json = serde_json::json!({
1132            "progressToken": 42,
1133            "progress": 0.5,
1134            "total": 1.0,
1135            "message": "Halfway done"
1136        });
1137        let params: ClientProgressParams = serde_json::from_value(json).unwrap();
1138        assert_eq!(params.marker, ProgressMarker::Number(42));
1139        assert!((params.progress - 0.5).abs() < f64::EPSILON);
1140        assert!((params.total.unwrap() - 1.0).abs() < f64::EPSILON);
1141        assert_eq!(params.message.as_deref(), Some("Halfway done"));
1142    }
1143
1144    #[test]
1145    fn client_progress_params_minimal() {
1146        let json = serde_json::json!({
1147            "progressToken": "tok-1",
1148            "progress": 0.0
1149        });
1150        let params: ClientProgressParams = serde_json::from_value(json).unwrap();
1151        assert_eq!(params.marker, ProgressMarker::String("tok-1".to_string()));
1152        assert!(params.total.is_none());
1153        assert!(params.message.is_none());
1154    }
1155}