mcp-compressor-core 0.19.6

Internal Rust core for mcp-compressor. Prefer the public mcp-compressor crate.
Documentation
//! `PythonGenerator` — generates an importable Python module.
//!
//! Each upstream tool becomes a typed async function that calls `POST /exec`
//! on the local tool proxy.  The generated module:
//!
//! - Is named `<cli_name>.py`.
//! - Imports `httpx` for HTTP calls.
//! - Contains `_BRIDGE` and `_TOKEN` module-level constants.
//! - Has one `def <tool_name>(...)` function per upstream tool with:
//!   - Required params as positional; optional as keyword-only with `None` default.
//!   - A docstring taken from the tool's description.
//!   - Return type annotation of `str`.
//! - Contains a `# generated by mcp-compressor — do not edit manually` header.

use crate::client_gen::generator::{ClientGenerator, GeneratedArtifact, GeneratorConfig};
use crate::Error;

pub struct PythonGenerator;

impl ClientGenerator for PythonGenerator {
    fn render(&self, config: &GeneratorConfig) -> Result<Vec<GeneratedArtifact>, Error> {
        Ok(vec![GeneratedArtifact::new(
            format!("{}.py", config.cli_name),
            render_python_module(config),
        )])
    }
}

fn render_python_module(config: &GeneratorConfig) -> String {
    let mut module = format!(
        r#"# generated by mcp-compressor — do not edit manually
import json
import urllib.error
import urllib.request

_BRIDGE = {bridge:?}
_TOKEN = {token:?}
_SESSION_PID = {pid}
_HEADERS = {{"Content-Type": "application/json", "Authorization": f"Bearer {{_TOKEN}}"}}


def _exec(tool: str, tool_input: dict) -> str:
    payload = json.dumps({{"tool": tool, "input": tool_input}}).encode()
    request = urllib.request.Request(_BRIDGE + "/exec", data=payload, headers=_HEADERS, method="POST")
    try:
        with urllib.request.urlopen(request) as response:
            return response.read().decode()
    except urllib.error.HTTPError as exc:
        message = exc.read().decode(errors="replace") or exc.reason
        raise RuntimeError(f"mcp-compressor proxy returned HTTP {{exc.code}}: {{message}}") from None
    except urllib.error.URLError as exc:
        raise RuntimeError(
            "mcp-compressor proxy is not running; restart the mcp-compressor process and try again. "
            f"details: {{exc.reason}}"
        ) from None
"#,
        bridge = config.bridge_url,
        token = config.token,
        pid = config.session_pid,
    );

    for tool in &config.tools {
        let params = python_params(tool);
        let input = python_input_dict(tool);
        let docstring = tool.description.as_deref().unwrap_or("");
        module.push_str(&format!(
            r#"

def {name}({params}) -> str:
    """{docstring}"""
    return _exec({tool_name:?}, {input})
"#,
            name = tool.name,
            params = params,
            docstring = docstring.replace("\"\"\"", "\\\"\\\"\\\""),
            tool_name = tool.name,
            input = input,
        ));
    }

    module
}

fn python_params(tool: &crate::compression::engine::Tool) -> String {
    ordered_param_names(tool)
        .into_iter()
        .map(|(name, required)| {
            if required {
                name
            } else {
                format!("{name}=None")
            }
        })
        .collect::<Vec<_>>()
        .join(", ")
}

fn python_input_dict(tool: &crate::compression::engine::Tool) -> String {
    let pairs = tool
        .param_names()
        .into_iter()
        .map(|name| format!("{name:?}: {name}"))
        .collect::<Vec<_>>()
        .join(", ");
    format!("{{{pairs}}}")
}

fn ordered_param_names(tool: &crate::compression::engine::Tool) -> Vec<(String, bool)> {
    let required = required_params(tool);
    let mut params = tool
        .param_names()
        .into_iter()
        .map(|name| {
            let is_required = required.contains(&name);
            (name, is_required)
        })
        .collect::<Vec<_>>();
    params.sort_by_key(|(_name, is_required)| !*is_required);
    params
}

fn required_params(tool: &crate::compression::engine::Tool) -> Vec<String> {
    tool.input_schema
        .get("required")
        .and_then(serde_json::Value::as_array)
        .map(|values| values.iter().filter_map(serde_json::Value::as_str).map(ToString::to_string).collect())
        .unwrap_or_default()
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;
    use crate::client_gen::generator::test_helpers::{make_config, make_config_multiword_tool};
    use crate::client_gen::{ClientGenerator, GeneratorConfig};
    use crate::compression::engine::Tool;
    use std::ffi::OsStr;
    use std::fs;

    // ------------------------------------------------------------------
    // File creation
    // ------------------------------------------------------------------

    /// generate() returns exactly one path (the .py module).
    #[test]
    fn generate_returns_one_file() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        assert_eq!(paths.len(), 1, "expected exactly one generated file");
    }

    /// The generated file has a `.py` extension.
    #[test]
    fn generated_file_has_py_extension() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        assert_eq!(
            paths[0].extension(),
            Some(OsStr::new("py")),
            "expected .py extension",
        );
    }

    /// The generated file is named `<cli_name>.py`.
    #[test]
    fn generated_file_named_after_cli_name() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        assert_eq!(paths[0].file_name(), Some(OsStr::new("my-server.py")));
    }

    /// The generated file exists on disk.
    #[test]
    fn generated_file_exists() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        assert!(paths[0].exists());
    }

    // ------------------------------------------------------------------
    // File content — header
    // ------------------------------------------------------------------

    /// The file starts with the "do not edit manually" comment.
    #[test]
    fn file_has_do_not_edit_header() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        assert!(
            content.contains("do not edit"),
            "expected 'do not edit' header, got: {content:?}",
        );
    }

    // ------------------------------------------------------------------
    // File content — constants
    // ------------------------------------------------------------------

    /// The file contains `_TOKEN` with the session token value.
    #[test]
    fn file_contains_token_constant() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        assert!(content.contains(&config.token), "token not found in file");
        assert!(content.contains("_TOKEN"), "_TOKEN constant not found");
    }

    /// The file contains `_BRIDGE` with the proxy URL.
    #[test]
    fn file_contains_bridge_constant() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        assert!(content.contains(&config.bridge_url), "bridge URL not found in file");
        assert!(content.contains("_BRIDGE"), "_BRIDGE constant not found");
    }

    // ------------------------------------------------------------------
    // File content — function definitions
    // ------------------------------------------------------------------

    /// Each upstream tool produces a `def <name>(...)` function.
    #[test]
    fn file_contains_function_per_tool() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        assert!(content.contains("def fetch("), "'def fetch(' not found");
        assert!(content.contains("def search("), "'def search(' not found");
    }

    /// Each function contains a docstring derived from the tool's description.
    #[test]
    fn function_has_docstring_from_description() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        assert!(content.contains("Fetch a URL"), "tool description not found in docstring");
        assert!(content.contains("Search the web"), "tool description not found in docstring");
    }

    /// Function parameters match the tool's schema properties.
    #[test]
    fn function_params_match_schema() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        // fetch() has "url" param; search() has "query" param
        assert!(content.contains("url"), "'url' param not found");
        assert!(content.contains("query"), "'query' param not found");
    }

    /// Functions have a `-> str` return type annotation.
    #[test]
    fn function_has_str_return_annotation() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        assert!(content.contains("-> str"), "-> str return annotation not found");
    }

    /// Required params appear without a default value; optional params use `None`.
    #[test]
    fn required_params_have_no_default_optional_have_none() {
        let dir = tempfile::tempdir().unwrap();
        // make_config has "url" as required for fetch, no optional
        let config = make_config(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        // "url" is required → no "= None" for url
        // (this test verifies the function signature handles optionality)
        assert!(content.contains("url"), "'url' param not found in generated file");
    }

    #[test]
    fn required_params_are_ordered_before_optional_params() {
        let dir = tempfile::tempdir().unwrap();
        let config = GeneratorConfig {
            cli_name: "real".to_string(),
            bridge_url: "http://127.0.0.1:1".to_string(),
            token: "token".to_string(),
            tools: vec![Tool::new(
                "real_tool",
                Some("Real schema".to_string()),
                serde_json::json!({
                    "type": "object",
                    "properties": {
                        "optional_first": {"type": "string"},
                        "required_second": {"type": "string"},
                        "optional_third": {"type": "string"}
                    },
                    "required": ["required_second"]
                }),
            )],
            session_pid: 1,
            output_dir: dir.path().to_path_buf(),
        };
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        assert!(content.contains("def real_tool(required_second, optional_first=None, optional_third=None) -> str:"));
        assert!(content.contains("{\"optional_first\": optional_first, \"required_second\": required_second, \"optional_third\": optional_third}"));
        std::process::Command::new("python3")
            .arg("-m")
            .arg("py_compile")
            .arg(&paths[0])
            .status()
            .expect("python3 should run")
            .success()
            .then_some(())
            .expect("generated Python should compile");
    }

    // ------------------------------------------------------------------
    // Multi-word tool names
    // ------------------------------------------------------------------

    /// A multi-word tool uses its original snake_case name as the function name.
    #[test]
    fn multiword_tool_function_name_is_snake_case() {
        let dir = tempfile::tempdir().unwrap();
        let config = make_config_multiword_tool(dir.path());
        let paths = PythonGenerator.generate(&config).unwrap();
        let content = fs::read_to_string(&paths[0]).unwrap();
        // Python keeps snake_case for function names
        assert!(
            content.contains("def get_confluence_page("),
            "expected snake_case function name, got: {content:?}",
        );
    }
}