mcp-compressor-core 0.21.3

Internal Rust core for mcp-compressor. Prefer the public mcp-compressor crate.
Documentation
use std::fs;
use std::net::{Ipv4Addr, SocketAddr};
use std::sync::Arc;

use crate::client_gen::cli::CliGenerator;
use crate::client_gen::{ClientGenerator, GeneratorConfig};
use crate::ffi::{generate_client_artifacts, FfiClientArtifactKind, FfiGeneratorConfig};
use crate::proxy::ToolProxyServer;
use crate::server::registration::FrontendServer;
use crate::server::CompressedServer;

use super::banner::{self, CliInfo};
use super::options::{CliOptions, FrontendTransport};
use super::paths::cli_output_dir;

pub async fn run_compressed_server(
    cli: &CliOptions,
    server: CompressedServer,
) -> Result<(), String> {
    match cli.transport {
        FrontendTransport::Stdio => run_compressed_stdio(server).await,
        FrontendTransport::StreamableHttp => run_compressed_streamable_http(cli, server).await,
    }
}

async fn run_compressed_stdio(server: CompressedServer) -> Result<(), String> {
    banner::print_banner(
        server.default_server_name(),
        "stdio",
        server.compression_level(),
        &server.backend_tools(),
        None,
    );
    rmcp::serve_server(FrontendServer::new(server), rmcp::transport::stdio())
        .await
        .map_err(|error| error.to_string())?
        .waiting()
        .await
        .map_err(|error| error.to_string())?;
    Ok(())
}

async fn run_compressed_streamable_http(
    cli: &CliOptions,
    server: CompressedServer,
) -> Result<(), String> {
    use rmcp::transport::streamable_http_server::{
        session::local::LocalSessionManager, tower::StreamableHttpService,
        StreamableHttpServerConfig,
    };

    banner::print_banner(
        server.default_server_name(),
        "streamable-http",
        server.compression_level(),
        &server.backend_tools(),
        None,
    );

    let service = StreamableHttpService::new(
        {
            let server = Arc::new(server);
            move || Ok(FrontendServer::from_arc(server.clone()))
        },
        Arc::new(LocalSessionManager::default()),
        StreamableHttpServerConfig::default().with_sse_keep_alive(None),
    );
    let router = axum::Router::new().nest_service("/mcp", service);
    let listener = tokio::net::TcpListener::bind(SocketAddr::from((Ipv4Addr::LOCALHOST, cli.port)))
        .await
        .map_err(|error| error.to_string())?;
    let addr = listener.local_addr().map_err(|error| error.to_string())?;
    eprintln!("Streamable HTTP MCP server listening on http://{addr}/mcp");
    axum::serve(listener, router)
        .await
        .map_err(|error| error.to_string())?;
    Ok(())
}

pub async fn run_just_bash_mode(cli: CliOptions, server: CompressedServer) -> Result<(), String> {
    let proxy = ToolProxyServer::start(server)
        .await
        .map_err(|error| error.to_string())?;
    let cli_name = cli
        .server_name
        .clone()
        .unwrap_or_else(|| "bash".to_string());

    println!("Just Bash ready");
    println!("Bridge URL: {}", proxy.bridge_url());
    println!("Use backend commands through the generated bridge. Full just-bash AST execution is not implemented in Rust yet.");
    println!("Session: {cli_name}");
    println!("Press Ctrl+C to stop.");

    wait_until_stopped().await;
    Ok(())
}

pub async fn run_cli_mode(cli: CliOptions, server: CompressedServer) -> Result<(), String> {
    let tools = server
        .single_backend_tools()
        .map_err(|error| error.to_string())?;
    let proxy = ToolProxyServer::start(server)
        .await
        .map_err(|error| error.to_string())?;
    let client_artifact_kind = cli.client_artifact_kind();
    let (output_dir, on_path) = if let Some(output_dir) = cli.output_dir.clone() {
        let on_path = std::env::var_os("PATH")
            .map(|paths| std::env::split_paths(&paths).any(|path| path == output_dir))
            .unwrap_or(false);
        (output_dir, on_path)
    } else if client_artifact_kind.is_some() {
        (
            std::env::current_dir()
                .map_err(|error| error.to_string())?
                .join("dist"),
            false,
        )
    } else {
        cli_output_dir().map_err(|error| error.to_string())?
    };
    let cli_name = cli.server_name.clone().unwrap_or_else(|| "mcp".to_string());
    let config = GeneratorConfig {
        cli_name: cli_name.clone(),
        bridge_url: proxy.bridge_url().to_string(),
        token: proxy.token_value().to_string(),
        tools,
        session_pid: std::process::id(),
        output_dir,
        extra_cli_bridges: Vec::new(),
    };
    let paths = if let Some(kind) = client_artifact_kind {
        let ffi_config = FfiGeneratorConfig {
            cli_name: config.cli_name.clone(),
            bridge_url: config.bridge_url.clone(),
            token: config.token.clone(),
            tools: config.tools.clone().into_iter().map(Into::into).collect(),
            session_pid: config.session_pid,
            output_dir: config.output_dir.clone(),
        };
        generate_client_artifacts(kind, ffi_config).map_err(|error| error.to_string())?
    } else {
        generate_or_update_cli_script(&config).map_err(|error| error.to_string())?
    };
    let script = paths
        .iter()
        .find(|path| path.file_name().and_then(|name| name.to_str()) == Some(cli_name.as_str()))
        .unwrap_or(&paths[0]);

    let transport_label = if cli.server_name.is_some() {
        "stdio"
    } else {
        "stdio"
    };
    banner::print_banner(
        cli.server_name.as_deref(),
        transport_label,
        &crate::compression::CompressionLevel::Max,
        &config.tools,
        Some(CliInfo {
            script_path: Some(&script.display().to_string()),
            bridge_url: Some(&proxy.bridge_url().to_string()),
            invoke_prefix: Some(&cli_name),
        }),
    );

    if let Some(kind) = client_artifact_kind {
        println!("{} code client ready", client_artifact_label(kind));
        println!("Generated files:");
        for path in &paths {
            println!("  {}", path.display());
        }
    } else {
        println!("CLI ready");
        println!("Generated CLI: {}", script.display());
    }
    if client_artifact_kind.is_none() {
        if on_path {
            println!("Invoke with: {cli_name} <subcommand> [args...]");
        } else {
            println!("Invoke with: {} <subcommand> [args...]", script.display());
            println!(
                "Note: {} is not on PATH; add it to PATH to run `{cli_name}` directly.",
                script
                    .parent()
                    .unwrap_or_else(|| std::path::Path::new("."))
                    .display()
            );
        }
    } else {
        println!("Import the generated client from your agent code while this process is running.");
    }
    println!("Bridge URL: {}", proxy.bridge_url());
    println!("Press Ctrl+C to stop.");

    wait_until_stopped().await;
    Ok(())
}

fn client_artifact_label(kind: FfiClientArtifactKind) -> &'static str {
    match kind {
        FfiClientArtifactKind::Cli => "CLI",
        FfiClientArtifactKind::Python => "Python",
        FfiClientArtifactKind::TypeScript => "TypeScript",
    }
}

async fn wait_until_stopped() {
    if std::env::var_os("MCP_COMPRESSOR_EXIT_AFTER_READY").is_some() {
        return;
    }

    match tokio::signal::ctrl_c().await {
        Ok(()) => {}
        Err(_) => std::future::pending::<()>().await,
    }
}

fn generate_or_update_cli_script(
    config: &GeneratorConfig,
) -> Result<Vec<std::path::PathBuf>, crate::Error> {
    use crate::client_gen::generator::write_artifacts;

    let mut merged_config = config.clone();
    let script_path = config.output_dir.join(&config.cli_name);
    if let Ok(existing) = fs::read_to_string(&script_path) {
        merged_config.extra_cli_bridges =
            crate::client_gen::cli::read_live_bridge_entries(&existing);
    }
    let artifacts = CliGenerator.render(&merged_config)?;
    write_artifacts(&artifacts, &config.output_dir)
}