klieo-mcp-server 2.2.0

Expose any klieo ToolInvoker or Agent as an MCP server over stdio or HTTP. The inverse of klieo-tools-mcp.
Documentation

klieo-mcp-server

Expose any klieo-core::ToolInvoker as an MCP server over stdio. The inverse of klieo-tools-mcp: external MCP hosts (Claude Desktop, Continue, LangGraph, OpenAI Agents SDK) can drive klieo- built tools via the standard Model Context Protocol.

Non-Rust adopters integrate klieo through one of three wire contracts: the NATS bus (durable mixed-language pipelines), this MCP server (stdio/HTTP host-driven tool calls), or the A2A peer protocol (JSON-RPC inter-agent). All three are wire-spec contracts requiring no klieo crate dependency on the non-Rust side.

Status

2.x — stable. See docs/SEMVER.md.

Quickstart

[dependencies]
klieo-mcp-server = "2"
klieo-tools      = "2"
klieo-core       = "2"
klieo-macros     = "2"
tokio            = { version = "1", features = ["full"] }
use std::sync::Arc;
use klieo_mcp_server::McpServer;
use klieo_tools::ChainedInvoker;
use klieo_macros::tool;

#[tool(name = "echo", description = "Echo the input back")]
async fn echo(text: String) -> Result<String, klieo_core::tool::ToolError> {
    Ok(text)
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let invoker = Arc::new(
        ChainedInvoker::new().with_tool(Arc::new(EchoTool))?,
    );
    let server = McpServer::expose_tools(invoker);
    server.serve_stdio().await?;
    Ok(())
}

McpServer::expose_tools takes any Arc<dyn ToolInvoker> — combine multiple tools via repeated .with_tool(...) calls on ChainedInvoker. For a single-tool quickstart the snippet above is paste-and-run once you point your MCP client at the binary.

Wire to Claude Desktop by adding to ~/Library/Application Support/Claude/claude_desktop_config.json:

{
  "mcpServers": {
    "klieo-tools": { "command": "/abs/path/to/your-mcp-binary" }
  }
}

Protocol

  • JSON-RPC 2.0 newline-delimited frames on stdin/stdout.
  • MCP protocol version 2025-03-26.
  • Methods supported: initialize, shutdown, tools/list, tools/call.

Scope

Feature Status
stdio transport shipped
expose_tools(invoker) shipped
expose_agent_with_schema(agent, schema, ctx_factory) shipped (0.9, ADR-010)
expose_agent::<A>(agent, ctx_factory) (auto-derive via schemars) shipped (0.9, behind schemars feature)
HTTP transport (Streamable HTTP, JSON-only) shipped (0.10, http feature, ADR-012)
SSE streaming responses shipped (0.12, http feature, ADR-014)
Client roots + sampling (stdio) shipped (0.26, ADR-027)
Client roots + sampling (HTTP) shipped (0.27, ADR-028)
SSE resumption (Last-Event-Id) shipped (0.32, ADR-033)
Multi-tenant auth deferred

Roots + sampling (server→client requests)

klieo-mcp-server 0.26+ supports two MCP server-initiated request flows:

  • Roots: the client advertises filesystem (or URI) scopes the server is allowed to operate on. Server reads via McpServer::client_roots() and subscribes to changes via McpServer::subscribe_root_changes().
  • Sampling: server asks the client to make an LLM completion on its behalf (the client owns the API keys). Opt in via McpServerBuilder::with_client_sampling(). Tool handlers reach sampling through ctx.server_outbound.sample(req).await (McpOutboundExt trait).

Both flows are carried over stdio (0.26) and streamable HTTP (0.27).

HTTP support (added 0.27, ADR-028)

The streamable-HTTP transport now carries roots + sampling over a POST /mcp + GET /mcp SSE endpoint pair:

  • POST /mcp accepts initialize, tools/list, tools/call, and responses to server-initiated outbound requests.
  • GET /mcp opens a long-lived SSE stream that carries server- initiated outbound requests (sampling/createMessage, roots/list) and notifications back to the client.

A Mcp-Session-Id header is minted by the server on the first successful initialize. The client echoes it on every subsequent POST /mcp and on the SSE GET /mcp. The server runs single- session: a second concurrent initialize while a session is open returns 409 Conflict. Per-session idle timeout defaults to 5 minutes and is configurable via McpServerBuilder::with_session_idle_timeout(Duration) (Duration::ZERO disables).

Authentication (when configured) gates both POST /mcp and GET /mcp uniformly.

See ADR-028 for the original (single-session) HTTP scope, ADR-031 for the multi-session upgrade, and ADR-032 for the per-principal sub-cap. See ADR-033 for resume semantics. Streamable HTTP supports multiple concurrent Mcp-Session-Ids with a configurable global cap (default 1024) and a per-principal sub-cap (default max_sessions / 16 floored at 1, configurable via with_max_sessions_per_principal). The DELETE /mcp endpoint tears down a session on demand. SSE resumption ships via Last-Event-Id replay (ADR-033) — disconnected clients reconnect with the standard header and the server replays the strictly-newer slice of a bounded per-session buffer. Outbound queue is bounded (drop-oldest, 1024 frames); ADR-029 records the JSON-RPC code renumber that landed alongside the bounded queue.

Example: examples/mcp-sampling-tool — a single tool that asks the client for a one-line summary. Run with --http <addr> to use the HTTP transport variant (cargo run -p mcp-sampling-tool -- --http 127.0.0.1:8765). Recipe walkthrough lives in the book at recipes/mcp-sampling-tool.md.

Agent exposure (ADR-010)

use std::sync::Arc;
use klieo_mcp_server::McpServer;
# use klieo_core::agent::{Agent, AgentContext};

# async fn _ex<A: Agent + 'static>(agent: A, ctx_template: AgentContext)
# where
#     A::Input: serde::de::DeserializeOwned + schemars::JsonSchema + Send + 'static,
#     A::Output: serde::Serialize + Send + 'static,
# {
// Auto-derive variant (requires `schemars` feature on klieo-mcp-server
// and `#[derive(JsonSchema)]` on A::Input):
let server = McpServer::expose_agent(agent, Arc::new(move || ctx_template.clone()));
server.serve_stdio().await.expect("server loop");
# }

For agents whose Input type cannot derive JsonSchema, pass the schema explicitly via expose_agent_with_schema(agent, schema, ctx_factory). The ctx_factory closure mints a fresh AgentContext per tools/call (so each invocation gets its own RunId).

See ADR-010 for the design trade-off.

HTTP transport (cluster P-3, 0.10)

klieo-mcp-server ships an HTTP transport behind the http cargo feature. Streamable HTTP per the MCP 2025-03-26 spec, JSON-response branch only (no SSE upgrade in 0.10).

[dependencies]
klieo-mcp-server = { version = "2", features = ["http"] }
use klieo_mcp_server::McpServer;
use std::sync::Arc;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let server = McpServer::builder()
        .add_tools(my_invoker())
        .build_arc()?;
    // Bind loopback — no auth on this transport.
    server.serve_http("127.0.0.1:3001".parse()?).await?;
    Ok(())
}

Security boundary. No auth. Bind 127.0.0.1 and front with an auth-enforcing reverse proxy for any non-localhost deployment. Plain HTTP only — terminate TLS at the proxy.

Streaming agent responses

MCP tools/call against an agent invoker upgrades to SSE when the client opts in via params._meta.progressToken (per MCP spec §6.4). The server emits one notifications/progress frame per agent step (LLM call start/end, tool call start/end) followed by a terminal tools/call result frame, then closes.

Request

{
  "jsonrpc": "2.0", "id": 7, "method": "tools/call",
  "params": {
    "name": "my-agent",
    "arguments": { "prompt": "hello" },
    "_meta": { "progressToken": "tok-abc" }
  }
}

Response (text/event-stream)

event: progress
data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"tok-abc","data":{"kind":"llm_call_started"}}}

event: progress
data: {"jsonrpc":"2.0","method":"notifications/progress","params":{"progressToken":"tok-abc","data":{"kind":"llm_call_completed","tokens":42,"latency_ms":180}}}

event: result
data: {"jsonrpc":"2.0","id":7,"result":{"content":[{"type":"text","text":"final reply"}]}}

Backward compatibility

Requests without _meta.progressToken get the existing JSON response — no behaviour change for non-streaming clients.

Limitations

  • SSE resumption via Last-Event-ID is not honoured; clients redrive tools/call from scratch on disconnect.
  • Cross-replica fanout is out of scope — broadcast is in-process only.
  • Client TCP disconnect does NOT propagate to ctx.cancel; the agent completes in the background.
  • Streaming-path LlmCallCompleted carries tokens: 0 until usage plumbing is wired (tracked).

See docs/adr/adr-014-mcp-streaming-agent-responses.md for the design rationale and the acknowledged end-to-end test coverage gap.

Shared bus

Pass a ToolCtxFactory to expose tools that need real Pubsub / KvStore / JobQueue access (not the per-request in-memory default):

use klieo_mcp_server::{McpServer, ToolCtxFactory};
use klieo_core::tool::ToolCtx;
use std::sync::Arc;

# fn my_invoker() -> Arc<dyn klieo_core::tool::ToolInvoker> { unimplemented!() }
# fn my_nats_pubsub() -> impl klieo_core::bus::Pubsub { unimplemented!() }
# fn my_nats_kv() -> impl klieo_core::bus::KvStore { unimplemented!() }
# fn my_nats_jobs() -> impl klieo_core::bus::JobQueue { unimplemented!() }
let nats_pubsub = Arc::new(my_nats_pubsub());
let nats_kv     = Arc::new(my_nats_kv());
let nats_jobs   = Arc::new(my_nats_jobs());

let factory: ToolCtxFactory = Arc::new(move || ToolCtx {
    pubsub: nats_pubsub.clone(),
    kv:     nats_kv.clone(),
    jobs:   nats_jobs.clone(),
});

let server = McpServer::builder()
    .with_tool_ctx_factory(factory)
    .add_tools(my_invoker())
    .build_arc()?;

Applies to both serve_stdio and serve_http.

Tool context

Each tools/call runs against an in-memory ToolCtx (fresh MemoryBus per request) unless you override via with_tool_ctx_factory as shown above. The default is unchanged for all existing stdio adopters.

Limitations

  • The MCP-protocol response shape flattens tool output as a single content[0].text block. Tools returning structured JSON are serialised via serde_json::Value::to_string — MCP clients expecting structured responses should decode the text.
  • No streaming responses; tools that stream must accumulate before returning.