use crate::client_gen::generator::{ClientGenerator, GeneratedArtifact, GeneratorConfig};
use crate::Error;
pub struct PythonGenerator;
impl ClientGenerator for PythonGenerator {
fn render(&self, config: &GeneratorConfig) -> Result<Vec<GeneratedArtifact>, Error> {
Ok(vec![GeneratedArtifact::new(
format!("{}.py", config.cli_name),
render_python_module(config),
)])
}
}
fn render_python_module(config: &GeneratorConfig) -> String {
let mut module = format!(
r#"# generated by mcp-compressor — do not edit manually
import json
import urllib.error
import urllib.request
_BRIDGE = {bridge:?}
_TOKEN = {token:?}
_SESSION_PID = {pid}
_HEADERS = {{"Content-Type": "application/json", "Authorization": f"Bearer {{_TOKEN}}"}}
def _exec(tool: str, tool_input: dict) -> str:
payload = json.dumps({{"tool": tool, "input": tool_input}}).encode()
request = urllib.request.Request(_BRIDGE + "/exec", data=payload, headers=_HEADERS, method="POST")
try:
with urllib.request.urlopen(request) as response:
return response.read().decode()
except urllib.error.HTTPError as exc:
message = exc.read().decode(errors="replace") or exc.reason
raise RuntimeError(f"mcp-compressor proxy returned HTTP {{exc.code}}: {{message}}") from None
except urllib.error.URLError as exc:
raise RuntimeError(
"mcp-compressor proxy is not running; restart the mcp-compressor process and try again. "
f"details: {{exc.reason}}"
) from None
"#,
bridge = config.bridge_url,
token = config.token,
pid = config.session_pid,
);
for tool in &config.tools {
let params = python_params(tool);
let input = python_input_dict(tool);
let docstring = tool.description.as_deref().unwrap_or("");
module.push_str(&format!(
r#"
def {name}({params}) -> str:
"""{docstring}"""
return _exec({tool_name:?}, {input})
"#,
name = tool.name,
params = params,
docstring = docstring.replace("\"\"\"", "\\\"\\\"\\\""),
tool_name = tool.name,
input = input,
));
}
module
}
fn python_params(tool: &crate::compression::engine::Tool) -> String {
ordered_param_names(tool)
.into_iter()
.map(|(name, required)| {
if required {
name
} else {
format!("{name}=None")
}
})
.collect::<Vec<_>>()
.join(", ")
}
fn python_input_dict(tool: &crate::compression::engine::Tool) -> String {
let pairs = tool
.param_names()
.into_iter()
.map(|name| format!("{name:?}: {name}"))
.collect::<Vec<_>>()
.join(", ");
format!("{{{pairs}}}")
}
fn ordered_param_names(tool: &crate::compression::engine::Tool) -> Vec<(String, bool)> {
let required = required_params(tool);
let mut params = tool
.param_names()
.into_iter()
.map(|name| {
let is_required = required.contains(&name);
(name, is_required)
})
.collect::<Vec<_>>();
params.sort_by_key(|(_name, is_required)| !*is_required);
params
}
fn required_params(tool: &crate::compression::engine::Tool) -> Vec<String> {
tool.input_schema
.get("required")
.and_then(serde_json::Value::as_array)
.map(|values| values.iter().filter_map(serde_json::Value::as_str).map(ToString::to_string).collect())
.unwrap_or_default()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::client_gen::generator::test_helpers::{make_config, make_config_multiword_tool};
use crate::client_gen::{ClientGenerator, GeneratorConfig};
use crate::compression::engine::Tool;
use std::ffi::OsStr;
use std::fs;
#[test]
fn generate_returns_one_file() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
assert_eq!(paths.len(), 1, "expected exactly one generated file");
}
#[test]
fn generated_file_has_py_extension() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
assert_eq!(
paths[0].extension(),
Some(OsStr::new("py")),
"expected .py extension",
);
}
#[test]
fn generated_file_named_after_cli_name() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
assert_eq!(paths[0].file_name(), Some(OsStr::new("my-server.py")));
}
#[test]
fn generated_file_exists() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
assert!(paths[0].exists());
}
#[test]
fn file_has_do_not_edit_header() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(
content.contains("do not edit"),
"expected 'do not edit' header, got: {content:?}",
);
}
#[test]
fn file_contains_token_constant() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(content.contains(&config.token), "token not found in file");
assert!(content.contains("_TOKEN"), "_TOKEN constant not found");
}
#[test]
fn file_contains_bridge_constant() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(content.contains(&config.bridge_url), "bridge URL not found in file");
assert!(content.contains("_BRIDGE"), "_BRIDGE constant not found");
}
#[test]
fn file_contains_function_per_tool() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(content.contains("def fetch("), "'def fetch(' not found");
assert!(content.contains("def search("), "'def search(' not found");
}
#[test]
fn function_has_docstring_from_description() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(content.contains("Fetch a URL"), "tool description not found in docstring");
assert!(content.contains("Search the web"), "tool description not found in docstring");
}
#[test]
fn function_params_match_schema() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(content.contains("url"), "'url' param not found");
assert!(content.contains("query"), "'query' param not found");
}
#[test]
fn function_has_str_return_annotation() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(content.contains("-> str"), "-> str return annotation not found");
}
#[test]
fn required_params_have_no_default_optional_have_none() {
let dir = tempfile::tempdir().unwrap();
let config = make_config(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(content.contains("url"), "'url' param not found in generated file");
}
#[test]
fn required_params_are_ordered_before_optional_params() {
let dir = tempfile::tempdir().unwrap();
let config = GeneratorConfig {
cli_name: "real".to_string(),
bridge_url: "http://127.0.0.1:1".to_string(),
token: "token".to_string(),
tools: vec![Tool::new(
"real_tool",
Some("Real schema".to_string()),
serde_json::json!({
"type": "object",
"properties": {
"optional_first": {"type": "string"},
"required_second": {"type": "string"},
"optional_third": {"type": "string"}
},
"required": ["required_second"]
}),
)],
session_pid: 1,
output_dir: dir.path().to_path_buf(),
};
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(content.contains("def real_tool(required_second, optional_first=None, optional_third=None) -> str:"));
assert!(content.contains("{\"optional_first\": optional_first, \"required_second\": required_second, \"optional_third\": optional_third}"));
std::process::Command::new("python3")
.arg("-m")
.arg("py_compile")
.arg(&paths[0])
.status()
.expect("python3 should run")
.success()
.then_some(())
.expect("generated Python should compile");
}
#[test]
fn multiword_tool_function_name_is_snake_case() {
let dir = tempfile::tempdir().unwrap();
let config = make_config_multiword_tool(dir.path());
let paths = PythonGenerator.generate(&config).unwrap();
let content = fs::read_to_string(&paths[0]).unwrap();
assert!(
content.contains("def get_confluence_page("),
"expected snake_case function name, got: {content:?}",
);
}
}