kodegen_mcp_client 0.3.0

KODEGEN.ᴀɪ: Database query and schema exploration MCP tools for AI agents.
Documentation
use rmcp::{
    RoleClient,
    model::{CallToolRequestParam, CallToolResult, ClientInfo, InitializeResult},
    service::{Peer, RunningService},
};
use tokio::time::{Duration, timeout};

pub mod error;
pub mod responses;
pub mod transports;
pub mod validation;

pub use error::{ClientError, TransportType};
pub use transports::{StdioClientBuilder, create_stdio_client, create_streamable_client};

/// Get human-readable JSON type name for error messages
fn json_type_name(value: &serde_json::Value) -> &'static str {
    match value {
        serde_json::Value::Null => "null",
        serde_json::Value::Bool(_) => "boolean",
        serde_json::Value::Number(_) => "number",
        serde_json::Value::String(_) => "string",
        serde_json::Value::Array(_) => "array",
        serde_json::Value::Object(_) => "object",
    }
}

/// Default timeout for MCP operations (30 seconds)
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);

/// Cheap-to-clone client handle for MCP operations
///
/// This handle can be cloned freely and shared across tasks/threads. The handle
/// wraps a `Peer<RoleClient>` from the `rmcp` crate, which internally contains
/// four Arc pointers. Cloning only copies these Arc pointers (16 bytes each on
/// 64-bit systems) and increments reference counts atomically.
///
/// ## Thread Safety
///
/// `KodegenClient` is both `Send` and `Sync`, making it safe to share across threads:
///
/// ```ignore
/// let client2 = client.clone();
/// tokio::spawn(async move {
///     client2.call_tool("my_tool", args).await
/// });
/// ```
///
/// ## Timeout Behavior
///
/// Each client handle maintains its own timeout value. Cloning a client creates
/// a new handle with a **copy** of the current timeout, but changes to one handle's
/// timeout do not affect other handles:
///
/// ```ignore
/// let client1 = client.clone();
/// let client2 = client.with_timeout(Duration::from_secs(60));
/// // client1 still has default 30s timeout
/// // client2 has 60s timeout
/// ```
///
/// To set timeout for all clients, call `with_timeout()` before any `clone()`.
///
/// ## Performance
///
/// - Clone cost: ~48 bytes + 4 atomic increments (near-zero overhead)
/// - Memory per client: ~32 bytes (Arc pointers + Duration)
/// - No limit on number of clones (uses standard Arc reference counting)
/// - All clones share the same underlying MCP connection
///
/// ## Relationship to KodegenConnection
///
/// While the client handle can be cloned freely, the underlying connection
/// is managed by `KodegenConnection`. When the connection is dropped, all
/// client handles become invalid:
///
/// ```ignore
/// let (client, conn) = create_http_client(url).await?;
/// let client2 = client.clone();
/// drop(conn);  // Connection closed
/// client2.list_tools().await?;  // Error: connection closed
/// ```
#[derive(Clone)]
pub struct KodegenClient {
    peer: Peer<RoleClient>,
    default_timeout: Duration,
}

impl KodegenClient {
    /// Create a client from a peer (internal use)
    pub(crate) fn from_peer(peer: Peer<RoleClient>) -> Self {
        Self {
            peer,
            default_timeout: DEFAULT_TIMEOUT,
        }
    }

    /// Configure custom timeout for all operations
    ///
    /// This creates a client handle with a different timeout configuration.
    /// Each client handle has its own independent timeout setting.
    ///
    /// To use different timeouts for different operations, create multiple
    /// client handles with different timeout configurations:
    ///
    /// # Example
    ///
    /// ```ignore
    /// let conn = /* connection */;
    ///
    /// // Quick operations (10s timeout)
    /// let quick_client = conn.client().with_timeout(Duration::from_secs(10));
    ///
    /// // Long operations (5min timeout)
    /// let slow_client = conn.client().with_timeout(Duration::from_secs(300));
    ///
    /// // Use the appropriate client for each operation
    /// quick_client.call_tool("fast_tool", args).await?;
    /// slow_client.call_tool("slow_tool", args).await?;
    /// ```
    #[must_use]
    pub fn with_timeout(mut self, duration: Duration) -> Self {
        self.default_timeout = duration;
        self
    }

    /// Get server information
    #[must_use]
    pub fn server_info(&self) -> Option<&InitializeResult> {
        self.peer.peer_info()
    }

    /// List all available tools
    ///
    /// # Errors
    ///
    /// Returns `ClientError::Timeout` if the operation exceeds the configured timeout,
    /// or `ClientError::ServiceError` if the MCP request fails.
    pub async fn list_tools(&self) -> Result<Vec<rmcp::model::Tool>, ClientError> {
        timeout(self.default_timeout, self.peer.list_all_tools())
            .await
            .map_err(|_| ClientError::Timeout {
                operation: "list_tools".to_string(),
                duration: self.default_timeout,
            })?
            .map_err(ClientError::from)
    }

    /// Call a tool by name with JSON arguments
    ///
    /// # Errors
    ///
    /// Returns `ClientError::Timeout` if the operation exceeds the configured timeout,
    /// or `ClientError::ServiceError` if the tool call fails or the tool does not exist.
    pub async fn call_tool(
        &self,
        name: &str,
        arguments: serde_json::Value,
    ) -> Result<CallToolResult, ClientError> {
        let call = self.peer.call_tool(CallToolRequestParam {
            // name.to_string() allocation is required because CallToolRequestParam
            // expects Cow<'static, str>. Cannot use borrowed reference from &str parameter
            // as it doesn't satisfy the 'static lifetime requirement.
            name: name.to_string().into(),
            arguments: match arguments {
                serde_json::Value::Object(map) => Some(map),
                serde_json::Value::Null => None,
                other => {
                    return Err(ClientError::Protocol(format!(
                        "Tool arguments must be a JSON object or null, got {}",
                        json_type_name(&other)
                    )));
                }
            },
        });

        timeout(self.default_timeout, call)
            .await
            .map_err(|_| ClientError::Timeout {
                operation: format!("Tool '{}'", name),
                duration: self.default_timeout,
            })?
            .map_err(ClientError::from)
    }

    /// Call a tool and deserialize the response to a typed structure
    ///
    /// This provides type-safe parsing with clear error messages instead of fragile
    /// manual JSON extraction with nested Options. Use this with response types from
    /// the `responses` module for better error handling.
    ///
    /// # Example
    ///
    /// ```ignore
    /// use kodegen_mcp_client::responses::StartCrawlResponse;
    ///
    /// let response: StartCrawlResponse = client
    ///     .call_tool_typed("start_crawl", json!({...}))
    ///     .await?;
    /// let session_id = response.session_id;
    /// ```
    ///
    /// # Errors
    ///
    /// Returns `ClientError::ParseError` if the response cannot be deserialized,
    /// or any error from the underlying `call_tool` method.
    pub async fn call_tool_typed<T>(
        &self,
        name: &str,
        arguments: serde_json::Value,
    ) -> Result<T, ClientError>
    where
        T: serde::de::DeserializeOwned,
    {
        let result = self.call_tool(name, arguments).await?;

        // Extract text content from response - search all items
        let text_content = result
            .content
            .iter()
            .find_map(|c| c.as_text())
            .ok_or_else(|| {
                if result.content.is_empty() {
                    ClientError::Protocol(format!("Tool '{}' returned empty content array", name))
                } else {
                    // Show what content types were returned
                    let content_types: Vec<_> = result
                        .content
                        .iter()
                        .map(|c| match c.raw {
                            rmcp::model::RawContent::Text(_) => "text",
                            rmcp::model::RawContent::Image(_) => "image",
                            rmcp::model::RawContent::Resource(_) => "resource",
                            rmcp::model::RawContent::Audio(_) => "audio",
                            rmcp::model::RawContent::ResourceLink(_) => "resource_link",
                        })
                        .collect();

                    ClientError::Protocol(format!(
                        "Tool '{}' returned {} content item(s) but none were text: [{}]",
                        name,
                        result.content.len(),
                        content_types.join(", ")
                    ))
                }
            })?;

        // Deserialize to target type with context
        serde_json::from_str(&text_content.text).map_err(|e| ClientError::ParseError {
            tool_name: name.to_string(),
            source: e,
        })
    }
}

/// Connection lifecycle manager for MCP client
///
/// Manages the underlying connection and provides graceful shutdown.
/// NOT Clone - only one owner should manage the connection lifecycle.
///
/// The connection should be held as long as you want the MCP connection to remain active.
///
/// ## Cleanup Behavior
///
/// When dropped or when `close()` is called, the following cleanup occurs:
///
/// 1. **Service Cancellation**: rmcp's `DropGuard` cancels the internal `CancellationToken`
/// 2. **Loop Exit**: Service loop detects cancellation and exits gracefully  
/// 3. **Transport Close**: For stdio connections, `TokioChildProcess.graceful_shutdown()` executes:
///    - Closes stdin to signal server exit
///    - Waits up to 3 seconds for graceful termination
///    - Force kills process if timeout exceeded (SIGKILL/TerminateProcess)
/// 4. **Zombie Prevention**: `tokio::process::Child.wait()` reaps the process
///
/// Both `drop(connection)` and `connection.close().await` trigger the same cleanup flow.
/// Use `close()` if you need to await and handle cleanup errors; use drop for fire-and-forget.
///
/// ## Relationship to Clients
///
/// All client handles created from this connection become invalid when
/// the connection is dropped:
///
/// ```ignore
/// let (client, conn) = create_http_client(url).await?;
/// let client2 = client.clone();
///
/// drop(conn);  // Closes connection
///
/// // Both clients now fail:
/// client.list_tools().await?;  // Error
/// client2.call_tool(...).await?;  // Error
/// ```
///
/// ## Example
///
/// ```ignore
/// // Implicit cleanup via drop
/// {
///     let (client, conn) = create_stdio_client("node", &["server.js"]).await?;
///     // ... use client ...
/// } // Process terminated here via graceful shutdown
///
/// // Explicit cleanup with error handling
/// let (client, conn) = create_stdio_client("node", &["server.js"]).await?;
/// // ... use client ...
/// conn.close().await?;  // Await cleanup, handle errors
/// ```
#[must_use = "Connection must be held to keep MCP service alive"]
pub struct KodegenConnection {
    service: RunningService<RoleClient, ClientInfo>,
}

impl KodegenConnection {
    /// Create connection from running service
    ///
    /// This is a low-level constructor for creating a connection from an already-initialized
    /// MCP service. Most users should use the transport functions like `create_http_client()`
    /// which handle both service creation and connection setup.
    pub fn from_service(service: RunningService<RoleClient, ClientInfo>) -> Self {
        Self { service }
    }

    /// Get a clone-able client handle for MCP operations
    ///
    /// This creates a lightweight client handle that can be cloned and shared.
    /// Multiple client handles can coexist and all operate on the same underlying connection.
    #[must_use]
    pub fn client(&self) -> KodegenClient {
        KodegenClient::from_peer(self.service.peer().clone())
    }

    /// Graceful shutdown with proper MCP protocol cancellation
    ///
    /// Consumes the connection and performs a clean shutdown of the MCP protocol.
    /// This triggers the same cleanup as dropping the connection, but allows you to
    /// await completion and handle any errors.
    ///
    /// ## Cleanup Sequence
    ///
    /// 1. Cancels the service task via `CancellationToken`
    /// 2. Service loop exits and calls `transport.close()`
    /// 3. For stdio transports: stdin closed, 3-second wait, force kill if needed
    /// 4. Process reaped to prevent zombies
    ///
    /// ## Drop vs Close
    ///
    /// - **drop(connection)**: Fire-and-forget cleanup (cleanup errors logged but not returned)
    /// - **connection.close().await**: Awaitable cleanup (returns errors to caller)
    ///
    /// # Errors
    ///
    /// Returns `ClientError` if:
    /// - Service cancellation fails
    /// - Transport close fails (e.g., process kill error)
    /// - Service task panicked
    pub async fn close(self) -> Result<(), ClientError> {
        self.service
            .cancel()
            .await
            .map(|_| ())
            .map_err(ClientError::from)
    }

    /// Wait for the connection to close naturally
    ///
    /// This will block until the remote side closes the connection or an error occurs.
    /// Useful for long-lived connections where you want to keep the connection alive
    /// until the remote end terminates it.
    ///
    /// # Errors
    ///
    /// Returns `ClientError` if the connection fails or closes with an error.
    pub async fn wait(self) -> Result<(), ClientError> {
        self.service
            .waiting()
            .await
            .map(|_| ())
            .map_err(ClientError::from)
    }
}