codineer-runtime 0.6.8

Core runtime engine for Codineer: session, config, MCP, prompt, sandbox
Documentation
use std::collections::BTreeMap;

use serde_json::Value as JsonValue;

use crate::config::{McpTransport, RuntimeConfig, ScopedMcpServerConfig};
use crate::mcp::mcp_tool_name;
use crate::mcp_client::{McpClientBootstrap, McpClientTransport};
use crate::mcp_remote::McpRemoteClient;

use super::process::{default_initialize_params, spawn_mcp_stdio_process, McpStdioProcess};
use super::types::{
    JsonRpcId, JsonRpcResponse, ManagedMcpTool, McpListToolsParams, McpServerManagerError,
    McpToolCallParams, McpToolCallResult, UnsupportedMcpServer,
};

#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ToolRoute {
    pub(crate) server_name: String,
    pub(crate) raw_name: String,
}

#[derive(Debug)]
pub(crate) enum McpServerProcess {
    Stdio(Box<McpStdioProcess>),
    Remote(Box<McpRemoteClient>),
}

#[derive(Debug)]
pub(crate) struct ManagedMcpServer {
    pub(crate) bootstrap: McpClientBootstrap,
    pub(crate) process: Option<McpServerProcess>,
    pub(crate) initialized: bool,
}

impl ManagedMcpServer {
    pub(crate) fn new(bootstrap: McpClientBootstrap) -> Self {
        Self {
            bootstrap,
            process: None,
            initialized: false,
        }
    }
}

#[derive(Debug)]
pub struct McpServerManager {
    servers: BTreeMap<String, ManagedMcpServer>,
    unsupported_servers: Vec<UnsupportedMcpServer>,
    tool_index: BTreeMap<String, ToolRoute>,
    next_request_id: u64,
}

impl McpServerManager {
    #[must_use]
    pub fn from_runtime_config(config: &RuntimeConfig) -> Self {
        Self::from_servers(config.mcp().servers())
    }

    #[must_use]
    pub fn from_servers(servers: &BTreeMap<String, ScopedMcpServerConfig>) -> Self {
        let mut managed_servers = BTreeMap::new();
        let mut unsupported_servers = Vec::new();

        for (server_name, server_config) in servers {
            let transport = server_config.transport();
            match transport {
                McpTransport::Stdio | McpTransport::Sse | McpTransport::Http | McpTransport::Ws => {
                    let bootstrap =
                        McpClientBootstrap::from_scoped_config(server_name, server_config);
                    managed_servers.insert(server_name.clone(), ManagedMcpServer::new(bootstrap));
                }
                McpTransport::Sdk | McpTransport::ManagedProxy => {
                    unsupported_servers.push(UnsupportedMcpServer {
                        server_name: server_name.clone(),
                        transport,
                        reason: format!(
                            "transport {transport:?} is not supported by McpServerManager"
                        ),
                    });
                }
            }
        }

        Self {
            servers: managed_servers,
            unsupported_servers,
            tool_index: BTreeMap::new(),
            next_request_id: 1,
        }
    }

    #[must_use]
    pub fn unsupported_servers(&self) -> &[UnsupportedMcpServer] {
        &self.unsupported_servers
    }

    pub async fn discover_tools(&mut self) -> Result<Vec<ManagedMcpTool>, McpServerManagerError> {
        let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
        let mut discovered_tools = Vec::new();

        for server_name in server_names {
            self.ensure_server_ready(&server_name).await?;
            self.clear_routes_for_server(&server_name);

            let mut cursor = None;
            loop {
                let request_id = self.take_request_id();
                let response = {
                    let server = self.server_mut(&server_name)?;
                    let process = server.process.as_mut().ok_or_else(|| {
                        McpServerManagerError::InvalidResponse {
                            server_name: server_name.clone(),
                            method: "tools/list",
                            details: "server process missing after initialization".to_string(),
                        }
                    })?;
                    let params = Some(McpListToolsParams {
                        cursor: cursor.clone(),
                    });
                    match process {
                        McpServerProcess::Stdio(p) => p.list_tools(request_id, params).await?,
                        McpServerProcess::Remote(c) => c.list_tools(request_id, params).await?,
                    }
                };

                if let Some(error) = response.error {
                    return Err(McpServerManagerError::JsonRpc {
                        server_name: server_name.clone(),
                        method: "tools/list",
                        error,
                    });
                }

                let result =
                    response
                        .result
                        .ok_or_else(|| McpServerManagerError::InvalidResponse {
                            server_name: server_name.clone(),
                            method: "tools/list",
                            details: "missing result payload".to_string(),
                        })?;

                for tool in result.tools {
                    let qualified_name = mcp_tool_name(&server_name, &tool.name);
                    self.tool_index.insert(
                        qualified_name.clone(),
                        ToolRoute {
                            server_name: server_name.clone(),
                            raw_name: tool.name.clone(),
                        },
                    );
                    discovered_tools.push(ManagedMcpTool {
                        server_name: server_name.clone(),
                        qualified_name,
                        raw_name: tool.name.clone(),
                        tool,
                    });
                }

                match result.next_cursor {
                    Some(next_cursor) => cursor = Some(next_cursor),
                    None => break,
                }
            }
        }

        Ok(discovered_tools)
    }

    pub async fn call_tool(
        &mut self,
        qualified_tool_name: &str,
        arguments: Option<JsonValue>,
    ) -> Result<JsonRpcResponse<McpToolCallResult>, McpServerManagerError> {
        let route = self
            .tool_index
            .get(qualified_tool_name)
            .cloned()
            .ok_or_else(|| McpServerManagerError::UnknownTool {
                qualified_name: qualified_tool_name.to_string(),
            })?;

        self.ensure_server_ready(&route.server_name).await?;
        let request_id = self.take_request_id();
        let params = McpToolCallParams {
            name: route.raw_name,
            arguments,
            meta: None,
        };
        let response =
            {
                let server = self.server_mut(&route.server_name)?;
                let process = server.process.as_mut().ok_or_else(|| {
                    McpServerManagerError::InvalidResponse {
                        server_name: route.server_name.clone(),
                        method: "tools/call",
                        details: "server process missing after initialization".to_string(),
                    }
                })?;
                match process {
                    McpServerProcess::Stdio(p) => p.call_tool(request_id, params).await?,
                    McpServerProcess::Remote(c) => c.call_tool(request_id, params).await?,
                }
            };
        Ok(response)
    }

    pub async fn shutdown(&mut self) -> Result<(), McpServerManagerError> {
        let server_names = self.servers.keys().cloned().collect::<Vec<_>>();
        for server_name in server_names {
            let server = self.server_mut(&server_name)?;
            if let Some(process) = server.process.as_mut() {
                match process {
                    McpServerProcess::Stdio(p) => p.shutdown().await?,
                    McpServerProcess::Remote(c) => c.shutdown().await?,
                }
            }
            server.process = None;
            server.initialized = false;
        }
        Ok(())
    }

    fn clear_routes_for_server(&mut self, server_name: &str) {
        self.tool_index
            .retain(|_, route| route.server_name != server_name);
    }

    fn server_mut(
        &mut self,
        server_name: &str,
    ) -> Result<&mut ManagedMcpServer, McpServerManagerError> {
        self.servers
            .get_mut(server_name)
            .ok_or_else(|| McpServerManagerError::UnknownServer {
                server_name: server_name.to_string(),
            })
    }

    fn take_request_id(&mut self) -> JsonRpcId {
        let id = self.next_request_id;
        self.next_request_id = self.next_request_id.saturating_add(1);
        JsonRpcId::Number(id)
    }

    async fn ensure_server_ready(
        &mut self,
        server_name: &str,
    ) -> Result<(), McpServerManagerError> {
        let needs_spawn = self
            .servers
            .get(server_name)
            .map(|server| server.process.is_none())
            .ok_or_else(|| McpServerManagerError::UnknownServer {
                server_name: server_name.to_string(),
            })?;

        if needs_spawn {
            let server = self.server_mut(server_name)?;
            let process = match &server.bootstrap.transport {
                McpClientTransport::Stdio(_) => {
                    McpServerProcess::Stdio(Box::new(spawn_mcp_stdio_process(&server.bootstrap)?))
                }
                McpClientTransport::Sse(_)
                | McpClientTransport::Http(_)
                | McpClientTransport::WebSocket(_) => McpServerProcess::Remote(Box::new(
                    McpRemoteClient::connect(&server.bootstrap)
                        .await
                        .map_err(|e| McpServerManagerError::SpawnFailed {
                            server_name: server_name.to_string(),
                            source: e,
                        })?,
                )),
                other => {
                    return Err(McpServerManagerError::InvalidResponse {
                        server_name: server_name.to_string(),
                        method: "connect",
                        details: format!("transport {other:?} not supported"),
                    });
                }
            };
            server.process = Some(process);
            server.initialized = false;
        }

        let needs_initialize = self
            .servers
            .get(server_name)
            .map(|server| !server.initialized)
            .ok_or_else(|| McpServerManagerError::UnknownServer {
                server_name: server_name.to_string(),
            })?;

        if needs_initialize {
            let request_id = self.take_request_id();
            let params = default_initialize_params();
            let response = {
                let server = self.server_mut(server_name)?;
                let process = server.process.as_mut().ok_or_else(|| {
                    McpServerManagerError::InvalidResponse {
                        server_name: server_name.to_string(),
                        method: "initialize",
                        details: "server process missing before initialize".to_string(),
                    }
                })?;
                match process {
                    McpServerProcess::Stdio(p) => p.initialize(request_id, params).await?,
                    McpServerProcess::Remote(c) => c.initialize(request_id, params).await?,
                }
            };

            if let Some(error) = response.error {
                return Err(McpServerManagerError::JsonRpc {
                    server_name: server_name.to_string(),
                    method: "initialize",
                    error,
                });
            }

            if response.result.is_none() {
                return Err(McpServerManagerError::InvalidResponse {
                    server_name: server_name.to_string(),
                    method: "initialize",
                    details: "missing result payload".to_string(),
                });
            }

            let server = self.server_mut(server_name)?;
            server.initialized = true;
        }

        Ok(())
    }
}